You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:13 UTC
[5/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to
support spark 2.3.1 version(Make API changes in carbon to be compatible with
spark 2.3)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 124413d..747b064 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -18,17 +18,14 @@
package org.apache.carbondata.stream;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
@@ -78,12 +75,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.*;
/**
* Stream record reader
@@ -117,7 +113,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
// vectorized reader
private StructType outputSchema;
- private Object vectorProxy;
+ private CarbonVectorProxy vectorProxy;
private boolean isFinished = false;
// filter
@@ -143,9 +139,6 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
// InputMetricsStats
private InputMetricsStats inputMetricsStats;
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonStreamRecordReader.class.getName());
-
public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
QueryModel mdl, boolean useRawRow) {
this.isVectorReader = isVectorReader;
@@ -400,21 +393,15 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
return null;
}
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- if (isVectorReader) {
- Method method = null;
- try {
- method = vectorProxy.getClass().getMethod("numRows");
- int value = (int) method.invoke(vectorProxy);
- if (inputMetricsStats != null) {
- inputMetricsStats.incrementRecordRead((long) value);
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ if (isVectorReader) {
+ int value = vectorProxy.numRows();
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead((long) value);
+ }
+
+ return vectorProxy.getColumnarBatch();
}
- method = vectorProxy.getClass().getMethod("getColumnarBatch");
- return method.invoke(vectorProxy);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
if (inputMetricsStats != null) {
inputMetricsStats.incrementRecordRead(1L);
@@ -440,50 +427,39 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
return true;
}
- private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
- Constructor cons = null;
- // if filter is null and output projection is empty, use the row number of blocklet header
- int rowNum = 0;
- String methodName = "setNumRows";
- try {
- String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy";
- cons = CarbonStreamUtils.getConstructorWithReflection(vectorReaderClassName, MemoryMode.class,
- StructType.class, int.class);
- if (skipScanData) {
-
- int rowNums = header.getBlocklet_info().getNum_rows();
- vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP, outputSchema, rowNums);
- Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
- setNumRowsMethod.invoke(vectorProxy, rowNums);
- input.skipBlockletData(true);
- return rowNums > 0;
- }
- input.readBlockletData(header);
- vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP,outputSchema, input.getRowNums());
- if (null == filter) {
- while (input.hasNext()) {
- readRowFromStream();
- putRowToColumnBatch(rowNum++);
+ private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+ // if filter is null and output projection is empty, use the row number of blocklet header
+ if (skipScanData) {
+ int rowNums = header.getBlocklet_info().getNum_rows();
+ vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,rowNums);
+ vectorProxy.setNumRows(rowNums);
+ input.skipBlockletData(true);
+ return rowNums > 0;
}
- } else {
- try {
- while (input.hasNext()) {
- readRowFromStream();
- if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
- putRowToColumnBatch(rowNum++);
+
+ input.readBlockletData(header);
+ vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,input.getRowNums());
+ int rowNum = 0;
+ if (null == filter) {
+ while (input.hasNext()) {
+ readRowFromStream();
+ putRowToColumnBatch(rowNum++);
+ }
+ } else {
+ try {
+ while (input.hasNext()) {
+ readRowFromStream();
+ if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+ putRowToColumnBatch(rowNum++);
+ }
+ }
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in vector reader", e);
}
- }
- } catch (FilterUnsupportedException e) {
- throw new IOException("Failed to filter row in vector reader", e);
}
- }
- Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
- setNumRowsMethod.invoke(vectorProxy, rowNum);
- } catch (Exception e) {
- throw new IOException("Failed to fill row in vector reader", e);
+ vectorProxy.setNumRows(rowNum);
+ return rowNum > 0;
}
- return rowNum > 0;
- }
private void readRowFromStream() {
input.nextRow();
@@ -719,43 +695,24 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
}
}
- private void putRowToColumnBatch(int rowId) {
- Class<?>[] paramTypes = {int.class, Object.class, int.class};
- Method putRowToColumnBatch = null;
- try {
- putRowToColumnBatch = vectorProxy.getClass().getMethod("putRowToColumnBatch", paramTypes);
+ private void putRowToColumnBatch(int rowId) {
+ for (int i = 0; i < projection.length; i++) {
+ Object value = outputValues[i];
+ vectorProxy.putRowToColumnBatch(rowId,value,i);
- } catch (Exception e) {
- LOGGER.error(
- "Unable to put the row in the vector" + "rowid: " + rowId + e);
- }
- for (int i = 0; i < projection.length; i++) {
- Object value = outputValues[i];
- try {
- putRowToColumnBatch.invoke(vectorProxy, rowId, value, i);
- } catch (Exception e) {
- LOGGER.error(
- "Unable to put the row in the vector" + "rowid: " + rowId + e);
- }
+ }
}
- }
- @Override public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override public void close() throws IOException {
- if (null != input) {
- input.close();
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return 0;
}
- if (null != vectorProxy) {
- try {
- Method closeMethod = vectorProxy.getClass().getMethod("close");
- closeMethod.invoke(vectorProxy);
- } catch (Exception e) {
- LOGGER.error(
- "Unable to close the stream vector reader" + e);
- }
+
+ @Override public void close() throws IOException {
+ if (null != input) {
+ input.close();
+ }
+ if (null != vectorProxy) {
+ vectorProxy.close();
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 6f038eb..41fc013 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,7 +20,6 @@ import java.lang.Long
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.TaskMetricsMap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d433470..450ead1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types.{StringType, TimestampType}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
case class CarbonDictionaryCatalystDecoder(
@@ -125,22 +120,40 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
object CountStarPlan {
type ReturnType = (mutable.MutableList[Attribute], LogicalPlan)
- /**
- * It fill count star query attribute.
- */
- private def fillCountStarAttribute(
- expr: Expression,
- outputColumns: mutable.MutableList[Attribute]) {
- expr match {
- case par@Alias(_, _) =>
- val head = par.children.head.children.head
- head match {
- case count: Count if count.children.head.isInstanceOf[Literal] =>
- outputColumns += par.toAttribute
- case _ =>
- }
- }
- }
+ /**
+ * It fill count star query attribute.
+ * 2.2.1 plan
+ * Aggregate [count(1) AS count(1)#30L]
+ * +- Project
+ *
+ *2.3.0 plan
+ * Aggregate [cast(count(1) as string) AS count(1)#29]
+ * +- Project
+ */
+ private def fillCountStarAttribute(
+ expr: Expression,
+ outputColumns: mutable.MutableList[Attribute]) {
+ expr match {
+ case par@Alias(cast: Cast, _) =>
+ if (cast.child.isInstanceOf[AggregateExpression]) {
+ val head = cast.child.children.head
+ head match {
+ case count: Count if count.children.head.isInstanceOf[Literal] =>
+ outputColumns += par.toAttribute
+ case _ =>
+ }
+ }
+ case par@Alias(child, _) =>
+ if (child.isInstanceOf[AggregateExpression]) {
+ val head = child.children.head
+ head match {
+ case count: Count if count.children.head.isInstanceOf[Literal] =>
+ outputColumns += par.toAttribute
+ case _ =>
+ }
+ }
+ }
+ }
def unapply(plan: LogicalPlan): Option[ReturnType] = {
plan match {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 9b78db0..ac8eb64 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -64,7 +66,13 @@ case class CarbonCountStar(
carbonTable.getTableName,
Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
carbonTable)
- val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
+ val valueRaw =
+ attributesRaw.head.dataType match {
+ case StringType => Seq(UTF8String.fromString(Long.box(rowCount).toString)).toArray
+ .asInstanceOf[Array[Any]]
+ case _ => Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]
+ }
+ val value = new GenericInternalRow(valueRaw)
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
val row = if (outUnsafeRows) unsafeProjection(value) else value
sparkContext.parallelize(Seq(row))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 96a8162..d6117de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -356,12 +356,7 @@ object CarbonSession {
// Register a successfully instantiated context to the singleton. This should be at the
// end of the class definition so that the singleton is updated only if there is no
// exception in the construction of the instance.
- sparkContext.addSparkListener(new SparkListener {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- SparkSession.setDefaultSession(null)
- SparkSession.sqlListener.set(null)
- }
- })
+ CarbonToSparkAdapater.addSparkListener(sparkContext)
session.streams.addListener(new CarbonStreamingQueryListener(session))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 6312746..1ff9bc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -34,8 +34,6 @@ case class CustomDeterministicExpression(nonDt: Expression ) extends Expression
override def children: Seq[Expression] = Seq()
- override def deterministic: Boolean = true
-
def childexp : Expression = nonDt
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 516f9af..52801b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -884,17 +884,21 @@ case class CarbonLoadDataCommand(
// datatype is always int
val column = table.getColumnByName(table.getTableName, attr.name)
if (column.hasEncoding(Encoding.DICTIONARY)) {
- AttributeReference(
- attr.name,
+ CarbonToSparkAdapater.createAttributeReference(attr.name,
IntegerType,
attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ attr.metadata,
+ attr.exprId,
+ attr.qualifier,
+ attr)
} else if (attr.dataType == TimestampType || attr.dataType == DateType) {
- AttributeReference(
- attr.name,
+ CarbonToSparkAdapater.createAttributeReference(attr.name,
LongType,
attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ attr.metadata,
+ attr.exprId,
+ attr.qualifier,
+ attr)
} else {
attr
}
@@ -1095,7 +1099,8 @@ case class CarbonLoadDataCommand(
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
hdfsRelation.schema.toAttributes,
- Some(catalogTable))
+ Some(catalogTable),
+ false)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 1fcba3e..8f128fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -48,6 +49,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
+
/**
* Carbon specific optimization for late decode (convert dictionary key to value as late as
* possible), which can improve the aggregation performance and reduce memory usage
@@ -55,17 +57,33 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val PUSHED_FILTERS = "PushedFilters"
+ /*
+ Spark 2.3.1 plan there can be case of multiple projections like below
+ Project [substring(name, 1, 2)#124, name#123, tupleId#117, cast(rand(-6778822102499951904)#125
+ as string) AS rand(-6778822102499951904)#137]
+ +- Project [substring(name#123, 1, 2) AS substring(name, 1, 2)#124, name#123, UDF:getTupleId()
+ AS tupleId#117,
+ customdeterministicexpression(rand(-6778822102499951904)) AS rand(-6778822102499951904)#125]
+ +- Relation[imei#118,age#119,task#120L,num#121,level#122,name#123]
+ CarbonDatasourceHadoopRelation []
+ */
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case PhysicalOperation(projects, filters, l: LogicalRelation)
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- pruneFilterProject(
- l,
- projects,
- filters,
- (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
- a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil
+ // In Spark 2.3.1 there is case of multiple projections like below
+ // if 1 projection is failed then need to continue to other
+ try {
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
+ a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil
+ } catch {
+ case e: CarbonPhysicalPlanException => Nil
+ }
case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
!CarbonDictionaryDecoder.
@@ -157,17 +175,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
if (names.nonEmpty) {
val partitionSet = AttributeSet(names
.map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
- val partitionKeyFilters =
- ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet)))
+ val partitionKeyFilters = CarbonToSparkAdapater
+ .getPartitionKeyFilter(partitionSet, filterPredicates)
// Update the name with lower case as it is case sensitive while getting partition info.
val updatedPartitionFilters = partitionKeyFilters.map { exp =>
exp.transform {
case attr: AttributeReference =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
attr.name.toLowerCase,
attr.dataType,
attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ attr.metadata,
+ attr.exprId,
+ attr.qualifier,
+ attr)
}
}
partitions =
@@ -224,7 +245,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
}
- val (unhandledPredicates, pushedFilters) =
+ val (unhandledPredicates, pushedFilters, handledFilters ) =
selectFilters(relation.relation, candidatePredicates)
// A set of column attributes that are only referenced by pushed down filters. We can eliminate
@@ -232,10 +253,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
- AttributeSet(handledPredicates.flatMap(_.references)) --
- (projectSet ++ unhandledSet).map(relation.attributeMap)
+ try {
+ AttributeSet(handledPredicates.flatMap(_.references)) --
+ (projectSet ++ unhandledSet).map(relation.attributeMap)
+ } catch {
+ case e => throw new CarbonPhysicalPlanException
+ }
}
-
// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
@@ -309,6 +333,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
scanBuilder,
candidatePredicates,
pushedFilters,
+ handledFilters,
metadata,
needDecoder,
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -345,6 +370,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
scanBuilder,
candidatePredicates,
pushedFilters,
+ handledFilters,
metadata,
needDecoder,
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -359,9 +385,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
output: Seq[Attribute],
partitions: Seq[PartitionSpec],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
- ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
+ ArrayBuffer[AttributeReference], Seq[PartitionSpec])
+ => RDD[InternalRow],
candidatePredicates: Seq[Expression],
- pushedFilters: Seq[Filter],
+ pushedFilters: Seq[Filter], handledFilters: Seq[Filter],
metadata: Map[String, String],
needDecoder: ArrayBuffer[AttributeReference],
updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
@@ -380,19 +407,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
metadata,
relation.catalogTable.map(_.identifier), relation)
} else {
- RowDataSourceScanExec(output,
- scanBuilder(updateRequestedColumns,
- candidatePredicates,
- pushedFilters,
- needDecoder,
- partitions),
- relation.relation,
- getPartitioning(table.carbonTable, updateRequestedColumns),
- metadata,
- relation.catalogTable.map(_.identifier))
+ val partition = getPartitioning(table.carbonTable, updateRequestedColumns)
+ val rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
+ pushedFilters, needDecoder, partitions)
+ CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output,
+ pushedFilters, handledFilters,
+ rdd, partition, metadata)
}
}
+
def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
relation: CarbonDatasourceHadoopRelation,
needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
@@ -471,7 +495,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
protected[sql] def selectFilters(
relation: BaseRelation,
- predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+ predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Seq[Filter]) = {
// In case of ComplexType dataTypes no filters should be pushed down. IsNotNull is being
// explicitly added by spark and pushed. That also has to be handled and pushed back to
@@ -536,7 +560,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
// a filter to every row or not.
val (_, translatedFilters) = translated.unzip
- (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+ (unrecognizedPredicates ++ unhandledPredicates, translatedFilters, handledFilters)
}
/**
@@ -701,3 +725,5 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
}
}
+
+class CarbonPhysicalPlanException extends Exception
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 0c9490d..4499b19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.strategy
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
@@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-/**
- * Carbon strategies for ddl commands
- */
+ /**
+ * Carbon strategies for ddl commands
+ * CreateDataSourceTableAsSelectCommand class has extra argument in
+ * 2.3, so need to add wrapper to match the case
+ */
+object MatchCreateDataSourceTable {
+ def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match {
+ case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query)
+ case _ => None
+ }
+}
class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val LOGGER: LogService =
@@ -231,7 +240,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val cmd =
CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
- case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+ case MatchCreateDataSourceTable(tableDesc, mode, query)
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
&& (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 97c37df..62e2d85 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCom
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -74,9 +74,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
"Update operation is not supported for table which has index datamaps")
}
}
- val tableRelation = if (SPARK_VERSION.startsWith("2.1")) {
+ val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
relation
- } else if (SPARK_VERSION.startsWith("2.2")) {
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
alias match {
case Some(_) =>
CarbonReflectionUtils.getSubqueryAlias(
@@ -206,9 +206,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
}
}
// include tuple id in subquery
- if (SPARK_VERSION.startsWith("2.1")) {
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
Project(projList, relation)
- } else if (SPARK_VERSION.startsWith("2.2")) {
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
alias match {
case Some(_) =>
val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
@@ -277,14 +277,13 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
case attr => attr
}
}
- val version = SPARK_VERSION
val newChild: LogicalPlan = if (newChildOutput == child.output) {
- if (version.startsWith("2.1")) {
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
- } else if (version.startsWith("2.2")) {
+ } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
} else {
- throw new UnsupportedOperationException(s"Spark version $version is not supported")
+ throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
}
} else {
Project(newChildOutput, child)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 70e61bc..1840c5d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,7 +23,6 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -43,9 +43,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.ThriftWriter
@@ -72,6 +71,13 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
+object MatchLogicalRelation {
+ def unapply(logicalPlan: LogicalPlan): Option[(BaseRelation, Any, Any)] = logicalPlan match {
+ case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable)
+ case _ => None
+ }
+}
+
class CarbonFileMetastore extends CarbonMetaStore {
@transient
@@ -143,13 +149,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
sparkSession.catalog.currentDatabase)
val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+ MatchLogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
carbonDatasourceHadoopRelation.carbonRelation
- case LogicalRelation(
+ case MatchLogicalRelation(
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDatasourceHadoopRelation.carbonRelation
case SubqueryAlias(_, c)
- if SPARK_VERSION.startsWith("2.2") &&
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
(c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
@@ -598,13 +604,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
relation match {
case SubqueryAlias(_,
- LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+ MatchLogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
carbonDataSourceHadoopRelation
- case LogicalRelation(
+ case MatchLogicalRelation(
carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDataSourceHadoopRelation
case SubqueryAlias(_, c)
- if SPARK_VERSION.startsWith("2.2") &&
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
(c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName
.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index c59246d..76ff41a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -273,11 +273,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case attr: AttributeReference =>
updatedExpression.find { p => p._1.sameRef(attr) } match {
case Some((_, childAttr)) =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
childAttr.name,
childAttr.dataType,
childAttr.nullable,
- childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+ childAttr.metadata,
+ childAttr.exprId,
+ attr.qualifier,
+ attr)
case None =>
attr
}
@@ -296,11 +299,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case attr: AttributeReference =>
updatedExpression.find { p => p._1.sameRef(attr) } match {
case Some((_, childAttr)) =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
childAttr.name,
childAttr.dataType,
childAttr.nullable,
- childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+ childAttr.metadata,
+ childAttr.exprId,
+ attr.qualifier,
+ attr)
case None =>
attr
}
@@ -777,24 +783,36 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
val factAlias = factPlanExpForStreaming(name)
// create attribute reference object for each expression
val attrs = factAlias.map { factAlias =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
name,
alias.dataType,
- alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ factAlias.exprId,
+ alias.qualifier,
+ alias)
}
// add aggregate function in Aggregate node added for handling streaming
// to aggregate results from fact and aggregate table
val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
// same reference id will be used as it can be used by above nodes in the plan like
// sort, project, join
- Alias(
+ CarbonToSparkAdapater.createAliasRef(
updatedAggExp.head,
- name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated)
+ name,
+ alias.exprId,
+ alias.qualifier,
+ Option(alias.metadata),
+ Some(alias))
case alias@Alias(expression, name) =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
name,
alias.dataType,
- alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ alias.exprId,
+ alias.qualifier,
+ alias)
}
updatedExp
}
@@ -897,9 +915,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case attr: AttributeReference =>
newAggExp += attr
case exp: Expression =>
- newAggExp += Alias(
+ newAggExp += CarbonToSparkAdapater.createAliasRef(
exp,
- "dummy_" + counter)(NamedExpression.newExprId, None, None, false)
+ "dummy_" + counter,
+ NamedExpression.newExprId)
counter = counter + 1
}
}
@@ -923,12 +942,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// get the new aggregate expression
val newAggExp = getAggFunctionForFactStreaming(aggExp)
val updatedExp = newAggExp.map { exp =>
- Alias(exp,
- name)(
- NamedExpression.newExprId,
- alias.qualifier,
+ CarbonToSparkAdapater.createAliasRef(exp,
+ name,
+ NamedExpression.newExprId,
+ alias.qualifier,
Some(alias.metadata),
- alias.isGenerated)
+ Some(alias))
}
// adding to map which will be used while Adding an Aggregate node for handling streaming
// table plan change
@@ -936,10 +955,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
updatedExp
case alias@Alias(exp: Expression, name) =>
val newAlias = Seq(alias)
- val attr = AttributeReference(name,
- alias.dataType,
- alias.nullable,
- alias.metadata) (alias.exprId, alias.qualifier, alias.isGenerated)
+ val attr = CarbonToSparkAdapater.createAttributeReference(name,
+ alias.dataType,
+ alias.nullable,
+ alias.metadata,
+ alias.exprId,
+ alias.qualifier,
+ alias)
factPlanGrpExpForStreaming.put(
AggExpToColumnMappingModel(
removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))),
@@ -1093,11 +1115,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
private def removeQualifiers(expression: Expression) : Expression = {
expression.transform {
case attr: AttributeReference =>
- AttributeReference(
+ CarbonToSparkAdapater.createAttributeReference(
attr.name,
attr.dataType,
attr.nullable,
- attr.metadata)(attr.exprId, None, attr.isGenerated)
+ attr.metadata,
+ attr.exprId,
+ None,
+ attr)
}
}
@@ -1363,10 +1388,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
attr,
attributes)
val newExpressionId = NamedExpression.newExprId
- val childTableAttr = AttributeReference(attr.name,
+ val childTableAttr = CarbonToSparkAdapater.createAttributeReference(attr.name,
childAttr.dataType,
childAttr.nullable,
- childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
+ childAttr.metadata,
+ newExpressionId,
+ childAttr.qualifier,
+ attr)
updatedExpression.put(attr, childTableAttr)
// returning the alias to show proper column name in output
Seq(Alias(childAttr,
@@ -1378,12 +1406,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
attr,
attributes)
val newExpressionId = NamedExpression.newExprId
- val parentTableAttr = AttributeReference(name,
+ val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
- val childTableAttr = AttributeReference(name,
+ alias.nullable,
+ Metadata.empty,
+ alias.exprId,
+ alias.qualifier,
+ alias)
+ val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ newExpressionId,
+ alias.qualifier,
+ alias)
updatedExpression.put(parentTableAttr, childTableAttr)
// returning alias with child attribute reference
Seq(Alias(childAttr,
@@ -1409,13 +1445,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
val newExpressionId = NamedExpression.newExprId
// create a parent attribute reference which will be replced on node which may be referred
// by node like sort join
- val parentTableAttr = AttributeReference(name,
+ val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ alias.exprId,
+ alias.qualifier,
+ alias)
// creating a child attribute reference which will be replced
- val childTableAttr = AttributeReference(name,
+ val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ newExpressionId,
+ alias.qualifier,
+ alias)
// adding to map, will be used during other node updation like sort, join, project
updatedExpression.put(parentTableAttr, childTableAttr)
// returning alias with child attribute reference
@@ -1426,12 +1470,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// for streaming table
// create alias for aggregate table
val aggExpForStreaming = aggExp.map{ exp =>
- Alias(exp,
- name)(
+ CarbonToSparkAdapater.createAliasRef(exp,
+ name,
NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
- alias.isGenerated).asInstanceOf[NamedExpression]
+ Some(alias)).asInstanceOf[NamedExpression]
}
aggExpForStreaming
}
@@ -1460,12 +1504,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
}
val newExpressionId = NamedExpression.newExprId
- val parentTableAttr = AttributeReference(name,
+ val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
- val childTableAttr = AttributeReference(name,
+ alias.nullable,
+ Metadata.empty,
+ alias.exprId,
+ alias.qualifier,
+ alias)
+ val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
alias.dataType,
- alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+ alias.nullable,
+ Metadata.empty,
+ newExpressionId,
+ alias.qualifier,
+ alias)
updatedExpression.put(parentTableAttr, childTableAttr)
Seq(Alias(updatedExp, name)(newExpressionId,
alias.qualifier).asInstanceOf[NamedExpression])
@@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
// named expression list otherwise update the list and add it to set
if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
namedExpressionList +=
- Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
+ CarbonToSparkAdapater.createAliasRef(expressions.head,
+ name + "_ sum",
+ NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
- alias.isGenerated)
+ Some(alias))
validExpressionsMap += AggExpToColumnMappingModel(sumExp)
}
// check with same expression already count is present then do not add to
// named expression list otherwise update the list and add it to set
if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
namedExpressionList +=
- Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
- alias.qualifier,
- Some(alias.metadata),
- alias.isGenerated)
+ CarbonToSparkAdapater.createAliasRef(expressions.last, name + "_ count",
+ NamedExpression.newExprId,
+ alias.qualifier,
+ Some(alias.metadata),
+ Some(alias))
validExpressionsMap += AggExpToColumnMappingModel(countExp)
}
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 450902a..b96b6a7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonSessionCatalog
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
@@ -49,6 +50,7 @@ import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
/**
* All filter conversions are done here.
*/
@@ -297,6 +299,27 @@ object CarbonFilters {
filters.flatMap(translate(_, false)).toArray
}
+ /**
+ * This API checks whether StringTrim object is compatible with
+ * carbon,carbon only deals with the space any other symbol should
+ * be ignored.So condition is SPARK version < 2.3.
+ * If it is 2.3 then trimStr field should be empty
+ *
+ * @param stringTrim
+ * @return
+ */
+ def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
+ var isCompatible = true
+ if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+ .asInstanceOf[Option[Expression]]
+ if (trimStr.isDefined) {
+ isCompatible = false
+ }
+ }
+ isCompatible
+ }
+
def transformExpression(expr: Expression): CarbonExpression = {
expr match {
case Or(left, right)
@@ -385,7 +408,8 @@ object CarbonFilters {
new CarbonLiteralExpression(maxValueLimit,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)))
new AndExpression(l, r)
- case StringTrim(child) => transformExpression(child)
+ case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) =>
+ transformExpression(strTrim)
case s: ScalaUDF =>
new MatchExpression(s.children.head.toString())
case _ =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index b81d0a1..48c6377 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -786,8 +786,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
p.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
- a.explicitMetadata, a.isGenerated)
+ CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+ a.name,
+ a.exprId,
+ a.qualifier,
+ a.explicitMetadata,
+ Some(a))
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CustomDeterministicExpression(exp)
@@ -800,8 +804,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
f.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
- a.explicitMetadata, a.isGenerated)
+ CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+ a.name,
+ a.exprId,
+ a.qualifier,
+ a.explicitMetadata,
+ Some(a))
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CustomDeterministicExpression(exp)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 8eb47fc..1622724 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions
-import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonToSparkAdapater, DeleteRecords, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -479,7 +479,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
logicalPlan match {
case _: CarbonCreateTableCommand =>
ExplainCommand(logicalPlan, extended = isExtended.isDefined)
- case _ => ExplainCommand(OneRowRelation)
+ case _ => CarbonToSparkAdapater.getExplainCommandObj
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..d5fe6a4
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+ metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier,attrRef.isGenerated)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr: Option[NamedExpression] = None): Alias = {
+ val isGenerated:Boolean = if (namedExpr.isDefined) {
+ namedExpr.get.isGenerated
+ } else {
+ false
+ }
+ Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation)
+ }
+
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7fa01e9..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.1 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
- private ColumnarBatch columnarBatch;
-
- /**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- *
- * @param memMode which represent the type onheap or offheap vector.
- * @param rowNum rows number for vector reading
- * @param structFileds, metadata related to current schema of table.
- */
- public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
- columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
- }
-
- public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
- columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
- }
-
- /**
- * Sets the number of rows in this batch.
- */
- public void setNumRows(int numRows) {
- columnarBatch.setNumRows(numRows);
- }
-
- /**
- * Returns the number of rows for read, including filtered rows.
- */
- public int numRows() {
- return columnarBatch.capacity();
- }
-
- /**
- * Called to close all the columns in this batch. It is not valid to access the data after
- * calling this. This must be called at the end to clean up memory allocations.
- */
- public void close() {
- columnarBatch.close();
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public InternalRow getRow(int rowId) {
- return columnarBatch.getRow(rowId);
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public Object getColumnarBatch() {
- return columnarBatch;
- }
-
- public void resetDictionaryIds(int ordinal) {
- columnarBatch.column(ordinal).getDictionaryIds().reset();
- }
-
- /**
- * Resets this column for writing. The currently stored values are no longer accessible.
- */
- public void reset() {
- columnarBatch.reset();
- }
-
- public void putRowToColumnBatch(int rowId, Object value, int offset) {
- org.apache.spark.sql.types.DataType t = dataType(offset);
- if (null == value) {
- putNull(rowId, offset);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes(), offset);
- } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong(), offset);
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong(), offset);
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length, offset);
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
- columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value, offset);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value, offset);
- }
- }
- }
-
- public void putBoolean(int rowId, boolean value, int ordinal) {
- columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
- }
-
- public void putByte(int rowId, byte value, int ordinal) {
- columnarBatch.column(ordinal).putByte(rowId, (byte) value);
- }
-
- public void putShort(int rowId, short value, int ordinal) {
- columnarBatch.column(ordinal).putShort(rowId, (short) value);
- }
-
- public void putInt(int rowId, int value, int ordinal) {
- columnarBatch.column(ordinal).putInt(rowId, (int) value);
- }
-
- public void putFloat(int rowId, float value, int ordinal) {
- columnarBatch.column(ordinal).putFloat(rowId, (float) value);
- }
-
- public void putLong(int rowId, long value, int ordinal) {
- columnarBatch.column(ordinal).putLong(rowId, (long) value);
- }
-
- public void putDouble(int rowId, double value, int ordinal) {
- columnarBatch.column(ordinal).putDouble(rowId, (double) value);
- }
-
- public void putByteArray(int rowId, byte[] value, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
- }
-
- public void putInts(int rowId, int count, int value, int ordinal) {
- columnarBatch.column(ordinal).putInts(rowId, count, value);
- }
-
- public void putShorts(int rowId, int count, short value, int ordinal) {
- columnarBatch.column(ordinal).putShorts(rowId, count, value);
- }
-
- public void putLongs(int rowId, int count, long value, int ordinal) {
- columnarBatch.column(ordinal).putLongs(rowId, count, value);
- }
-
- public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
- columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
-
- }
-
- public void putDoubles(int rowId, int count, double value, int ordinal) {
- columnarBatch.column(ordinal).putDoubles(rowId, count, value);
- }
-
- public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
- }
-
- public void putNull(int rowId, int ordinal) {
- columnarBatch.column(ordinal).putNull(rowId);
- }
-
- public void putNulls(int rowId, int count, int ordinal) {
- columnarBatch.column(ordinal).putNulls(rowId, count);
- }
-
- public boolean isNullAt(int rowId, int ordinal) {
- return columnarBatch.column(ordinal).isNullAt(rowId);
- }
-
- public DataType dataType(int ordinal) {
- return columnarBatch.column(ordinal).dataType();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 989b1d5..dd690e4 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -34,11 +34,9 @@ import org.apache.spark.sql.types.StructType
/**
* Create table 'using carbondata' and insert the query result into it.
- *
* @param table the Catalog Table
* @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
* @param query the query whose result will be insert into the new relation
- *
*/
case class CreateCarbonSourceTableAsSelectCommand(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..7a68e3e
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+ metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier,attrRef.isGenerated)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr: Option[NamedExpression] = None): Alias = {
+ val isGenerated:Boolean = if (namedExpr.isDefined) {
+ namedExpr.get.isGenerated
+ } else {
+ false
+ }
+ Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation)
+ }
+
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq(OptimizeCodegen(conf))
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 944b32e..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.2 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
- private ColumnarBatch columnarBatch;
-
- /**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- *
- * @param memMode which represent the type onheap or offheap vector.
- * @param rowNum rows number for vector reading
- * @param structFileds, metadata related to current schema of table.
- */
- public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
- columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
- }
-
- public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
- columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
- }
-
- /**
- * Sets the number of rows in this batch.
- */
- public void setNumRows(int numRows) {
- columnarBatch.setNumRows(numRows);
- }
-
- /**
- * Returns the number of rows for read, including filtered rows.
- */
- public int numRows() {
- return columnarBatch.capacity();
- }
-
- /**
- * Called to close all the columns in this batch. It is not valid to access the data after
- * calling this. This must be called at the end to clean up memory allocations.
- */
- public void close() {
- columnarBatch.close();
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public InternalRow getRow(int rowId) {
- return columnarBatch.getRow(rowId);
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public Object getColumnarBatch() {
- return columnarBatch;
- }
-
- public Object reserveDictionaryIds(int capacity , int dummyOrdinal) {
- return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
- }
-
- public void resetDictionaryIds(int ordinal) {
- columnarBatch.column(ordinal).getDictionaryIds().reset();
- }
-
- /**
- * Resets this column for writing. The currently stored values are no longer accessible.
- */
- public void reset() {
- columnarBatch.reset();
- }
-
- public void putRowToColumnBatch(int rowId, Object value, int offset) {
- org.apache.spark.sql.types.DataType t = dataType(offset);
- if (null == value) {
- putNull(rowId, offset);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes(), offset);
- } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong(), offset);
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong(), offset);
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length, offset);
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
- columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value, offset);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value, offset);
- }
- }
- }
-
- public void putBoolean(int rowId, boolean value, int ordinal) {
- columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
- }
-
- public void putByte(int rowId, byte value, int ordinal) {
- columnarBatch.column(ordinal).putByte(rowId, (byte) value);
- }
-
- public void putShort(int rowId, short value, int ordinal) {
- columnarBatch.column(ordinal).putShort(rowId, (short) value);
- }
-
- public void putInt(int rowId, int value, int ordinal) {
- columnarBatch.column(ordinal).putInt(rowId, (int) value);
- }
-
- public void putFloat(int rowId, float value, int ordinal) {
- columnarBatch.column(ordinal).putFloat(rowId, (float) value);
- }
-
- public void putLong(int rowId, long value, int ordinal) {
- columnarBatch.column(ordinal).putLong(rowId, (long) value);
- }
-
- public void putDouble(int rowId, double value, int ordinal) {
- columnarBatch.column(ordinal).putDouble(rowId, (double) value);
- }
-
- public void putByteArray(int rowId, byte[] value, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
- }
-
- public void putInts(int rowId, int count, int value, int ordinal) {
- columnarBatch.column(ordinal).putInts(rowId, count, value);
- }
-
- public void putShorts(int rowId, int count, short value, int ordinal) {
- columnarBatch.column(ordinal).putShorts(rowId, count, value);
- }
-
- public void putLongs(int rowId, int count, long value, int ordinal) {
- columnarBatch.column(ordinal).putLongs(rowId, count, value);
- }
-
- public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
- columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
- }
-
- public void putDoubles(int rowId, int count, double value, int ordinal) {
- columnarBatch.column(ordinal).putDoubles(rowId, count, value);
- }
-
- public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
- }
-
- public void putNull(int rowId, int ordinal) {
- columnarBatch.column(ordinal).putNull(rowId);
- }
-
- public void putNulls(int rowId, int count, int ordinal) {
- columnarBatch.column(ordinal).putNulls(rowId, count);
- }
-
- public boolean isNullAt(int rowId, int ordinal) {
- return columnarBatch.column(ordinal).isNullAt(rowId);
- }
-
- public DataType dataType(int ordinal) {
- return columnarBatch.column(ordinal).dataType();
- }
-}