You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:13 UTC

[5/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 124413d..747b064 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -18,17 +18,14 @@
 package org.apache.carbondata.stream;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -78,12 +75,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
 import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.*;
 
 /**
  * Stream record reader
@@ -117,7 +113,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
 
   // vectorized reader
   private StructType outputSchema;
-  private Object vectorProxy;
+  private CarbonVectorProxy vectorProxy;
   private boolean isFinished = false;
 
   // filter
@@ -143,9 +139,6 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   // InputMetricsStats
   private InputMetricsStats inputMetricsStats;
 
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(CarbonStreamRecordReader.class.getName());
-
   public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
       QueryModel mdl, boolean useRawRow) {
     this.isVectorReader = isVectorReader;
@@ -400,21 +393,15 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     return null;
   }
 
-  @Override public Object getCurrentValue() throws IOException, InterruptedException {
-    if (isVectorReader) {
-      Method method = null;
-      try {
-        method = vectorProxy.getClass().getMethod("numRows");
-        int value = (int) method.invoke(vectorProxy);
-        if (inputMetricsStats != null) {
-          inputMetricsStats.incrementRecordRead((long) value);
+    @Override public Object getCurrentValue() throws IOException, InterruptedException {
+        if (isVectorReader) {
+            int value = vectorProxy.numRows();
+            if (inputMetricsStats != null) {
+                inputMetricsStats.incrementRecordRead((long) value);
+            }
+
+            return vectorProxy.getColumnarBatch();
         }
-        method = vectorProxy.getClass().getMethod("getColumnarBatch");
-        return method.invoke(vectorProxy);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
 
     if (inputMetricsStats != null) {
       inputMetricsStats.incrementRecordRead(1L);
@@ -440,50 +427,39 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     return true;
   }
 
-  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
-    Constructor cons = null;
-    // if filter is null and output projection is empty, use the row number of blocklet header
-    int rowNum = 0;
-    String methodName = "setNumRows";
-    try {
-      String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy";
-      cons = CarbonStreamUtils.getConstructorWithReflection(vectorReaderClassName, MemoryMode.class,
-              StructType.class, int.class);
-      if (skipScanData) {
-
-        int rowNums = header.getBlocklet_info().getNum_rows();
-        vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP, outputSchema, rowNums);
-        Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
-        setNumRowsMethod.invoke(vectorProxy, rowNums);
-        input.skipBlockletData(true);
-        return rowNums > 0;
-      }
-      input.readBlockletData(header);
-      vectorProxy = cons.newInstance(MemoryMode.OFF_HEAP,outputSchema, input.getRowNums());
-      if (null == filter) {
-        while (input.hasNext()) {
-          readRowFromStream();
-          putRowToColumnBatch(rowNum++);
+    private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+        // if filter is null and output projection is empty, use the row number of blocklet header
+        if (skipScanData) {
+            int rowNums = header.getBlocklet_info().getNum_rows();
+            vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,rowNums);
+            vectorProxy.setNumRows(rowNums);
+            input.skipBlockletData(true);
+            return rowNums > 0;
         }
-      } else {
-        try {
-          while (input.hasNext()) {
-            readRowFromStream();
-            if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
-              putRowToColumnBatch(rowNum++);
+
+        input.readBlockletData(header);
+        vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,input.getRowNums());
+        int rowNum = 0;
+        if (null == filter) {
+            while (input.hasNext()) {
+                readRowFromStream();
+                putRowToColumnBatch(rowNum++);
+            }
+        } else {
+            try {
+                while (input.hasNext()) {
+                    readRowFromStream();
+                    if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+                        putRowToColumnBatch(rowNum++);
+                    }
+                }
+            } catch (FilterUnsupportedException e) {
+                throw new IOException("Failed to filter row in vector reader", e);
             }
-          }
-        } catch (FilterUnsupportedException e) {
-          throw new IOException("Failed to filter row in vector reader", e);
         }
-      }
-      Method setNumRowsMethod = vectorProxy.getClass().getMethod(methodName, int.class);
-      setNumRowsMethod.invoke(vectorProxy, rowNum);
-    } catch (Exception e) {
-      throw new IOException("Failed to fill row in  vector reader", e);
+        vectorProxy.setNumRows(rowNum);
+        return rowNum > 0;
     }
-    return rowNum > 0;
-  }
 
   private void readRowFromStream() {
     input.nextRow();
@@ -719,43 +695,24 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     }
   }
 
-  private void putRowToColumnBatch(int rowId) {
-    Class<?>[] paramTypes = {int.class, Object.class, int.class};
-    Method putRowToColumnBatch = null;
-    try {
-      putRowToColumnBatch = vectorProxy.getClass().getMethod("putRowToColumnBatch", paramTypes);
+    private void putRowToColumnBatch(int rowId) {
+        for (int i = 0; i < projection.length; i++) {
+            Object value = outputValues[i];
+            vectorProxy.putRowToColumnBatch(rowId,value,i);
 
-    } catch (Exception e) {
-      LOGGER.error(
-              "Unable to put the row in the vector" + "rowid: " + rowId + e);
-    }
-    for (int i = 0; i < projection.length; i++) {
-      Object value = outputValues[i];
-      try {
-        putRowToColumnBatch.invoke(vectorProxy, rowId, value, i);
-      } catch (Exception e) {
-        LOGGER.error(
-                "Unable to put the row in the vector" + "rowid: " + rowId + e);
-      }
+        }
     }
-  }
 
-  @Override public float getProgress() throws IOException, InterruptedException {
-    return 0;
-  }
-
-  @Override public void close() throws IOException {
-    if (null != input) {
-      input.close();
+    @Override public float getProgress() throws IOException, InterruptedException {
+        return 0;
     }
-    if (null != vectorProxy) {
-      try {
-        Method closeMethod = vectorProxy.getClass().getMethod("close");
-        closeMethod.invoke(vectorProxy);
-      } catch (Exception e) {
-        LOGGER.error(
-                "Unable to close the stream vector reader" + e);
-      }
+
+    @Override public void close() throws IOException {
+        if (null != input) {
+            input.close();
+        }
+        if (null != vectorProxy) {
+            vectorProxy.close();
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 6f038eb..41fc013 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,7 +20,6 @@ import java.lang.Long
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.TaskMetricsMap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d433470..450ead1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.HiveSessionCatalog
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types.{StringType, TimestampType}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 case class CarbonDictionaryCatalystDecoder(
@@ -125,22 +120,40 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
 object CountStarPlan {
   type ReturnType = (mutable.MutableList[Attribute], LogicalPlan)
 
-  /**
-   * It fill count star query attribute.
-   */
-  private def fillCountStarAttribute(
-      expr: Expression,
-      outputColumns: mutable.MutableList[Attribute]) {
-    expr match {
-      case par@Alias(_, _) =>
-        val head = par.children.head.children.head
-        head match {
-          case count: Count if count.children.head.isInstanceOf[Literal] =>
-            outputColumns += par.toAttribute
-          case _ =>
-        }
-    }
-  }
+ /**
+  * It fill count star query attribute.
+  * 2.2.1 plan
+  * Aggregate [count(1) AS count(1)#30L]
+  * +- Project
+  *
+  *2.3.0 plan
+  * Aggregate [cast(count(1) as string) AS count(1)#29]
+  * +- Project
+  */
+ private def fillCountStarAttribute(
+     expr: Expression,
+     outputColumns: mutable.MutableList[Attribute]) {
+   expr match {
+     case par@Alias(cast: Cast, _) =>
+       if (cast.child.isInstanceOf[AggregateExpression]) {
+         val head = cast.child.children.head
+         head match {
+           case count: Count if count.children.head.isInstanceOf[Literal] =>
+             outputColumns += par.toAttribute
+           case _ =>
+         }
+       }
+     case par@Alias(child, _) =>
+       if (child.isInstanceOf[AggregateExpression]) {
+         val head = child.children.head
+         head match {
+           case count: Count if count.children.head.isInstanceOf[Literal] =>
+             outputColumns += par.toAttribute
+           case _ =>
+         }
+       }
+   }
+ }
 
   def unapply(plan: LogicalPlan): Option[ReturnType] = {
     plan match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 9b78db0..ac8eb64 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -64,7 +66,13 @@ case class CarbonCountStar(
             carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
       carbonTable)
-    val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
+    val valueRaw =
+      attributesRaw.head.dataType match {
+        case StringType => Seq(UTF8String.fromString(Long.box(rowCount).toString)).toArray
+          .asInstanceOf[Array[Any]]
+        case _ => Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]
+      }
+    val value = new GenericInternalRow(valueRaw)
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
     val row = if (outUnsafeRows) unsafeProjection(value) else value
     sparkContext.parallelize(Seq(row))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 96a8162..d6117de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -356,12 +356,7 @@ object CarbonSession {
         // Register a successfully instantiated context to the singleton. This should be at the
         // end of the class definition so that the singleton is updated only if there is no
         // exception in the construction of the instance.
-        sparkContext.addSparkListener(new SparkListener {
-          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-            SparkSession.setDefaultSession(null)
-            SparkSession.sqlListener.set(null)
-          }
-        })
+        CarbonToSparkAdapater.addSparkListener(sparkContext)
         session.streams.addListener(new CarbonStreamingQueryListener(session))
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 6312746..1ff9bc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -34,8 +34,6 @@ case class CustomDeterministicExpression(nonDt: Expression ) extends Expression
 
   override def children: Seq[Expression] = Seq()
 
-  override def deterministic: Boolean = true
-
   def childexp : Expression = nonDt
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 516f9af..52801b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -884,17 +884,21 @@ case class CarbonLoadDataCommand(
       // datatype is always int
       val column = table.getColumnByName(table.getTableName, attr.name)
       if (column.hasEncoding(Encoding.DICTIONARY)) {
-        AttributeReference(
-          attr.name,
+        CarbonToSparkAdapater.createAttributeReference(attr.name,
           IntegerType,
           attr.nullable,
-          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          attr.qualifier,
+          attr)
       } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
-        AttributeReference(
-          attr.name,
+        CarbonToSparkAdapater.createAttributeReference(attr.name,
           LongType,
           attr.nullable,
-          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          attr.qualifier,
+          attr)
       } else {
         attr
       }
@@ -1095,7 +1099,8 @@ case class CarbonLoadDataCommand(
 
     CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
       hdfsRelation.schema.toAttributes,
-      Some(catalogTable))
+      Some(catalogTable),
+      false)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 1fcba3e..8f128fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -48,6 +49,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
+
 /**
  * Carbon specific optimization for late decode (convert dictionary key to value as late as
  * possible), which can improve the aggregation performance and reduce memory usage
@@ -55,17 +57,33 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
 private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   val PUSHED_FILTERS = "PushedFilters"
 
+  /*
+  Spark 2.3.1 plan there can be case of multiple projections like below
+  Project [substring(name, 1, 2)#124, name#123, tupleId#117, cast(rand(-6778822102499951904)#125
+  as string) AS rand(-6778822102499951904)#137]
+   +- Project [substring(name#123, 1, 2) AS substring(name, 1, 2)#124, name#123, UDF:getTupleId()
+    AS tupleId#117,
+       customdeterministicexpression(rand(-6778822102499951904)) AS rand(-6778822102499951904)#125]
+   +- Relation[imei#118,age#119,task#120L,num#121,level#122,name#123]
+   CarbonDatasourceHadoopRelation []
+ */
   def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
       case PhysicalOperation(projects, filters, l: LogicalRelation)
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-        pruneFilterProject(
-          l,
-          projects,
-          filters,
-          (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
-            a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil
+        // In Spark 2.3.1 there is case of multiple projections like below
+        // if 1 projection is failed then need to continue to other
+        try {
+          pruneFilterProject(
+            l,
+            projects,
+            filters,
+            (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
+              a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil
+        } catch {
+          case e: CarbonPhysicalPlanException => Nil
+        }
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
         if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
             !CarbonDictionaryDecoder.
@@ -157,17 +175,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     if (names.nonEmpty) {
       val partitionSet = AttributeSet(names
         .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
-      val partitionKeyFilters =
-        ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet)))
+      val partitionKeyFilters = CarbonToSparkAdapater
+        .getPartitionKeyFilter(partitionSet, filterPredicates)
       // Update the name with lower case as it is case sensitive while getting partition info.
       val updatedPartitionFilters = partitionKeyFilters.map { exp =>
         exp.transform {
           case attr: AttributeReference =>
-            AttributeReference(
+            CarbonToSparkAdapater.createAttributeReference(
               attr.name.toLowerCase,
               attr.dataType,
               attr.nullable,
-              attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+              attr.metadata,
+              attr.exprId,
+              attr.qualifier,
+              attr)
         }
       }
       partitions =
@@ -224,7 +245,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       }
     }
 
-    val (unhandledPredicates, pushedFilters) =
+    val (unhandledPredicates, pushedFilters, handledFilters ) =
       selectFilters(relation.relation, candidatePredicates)
 
     // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
@@ -232,10 +253,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     val handledSet = {
       val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
       val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
-      AttributeSet(handledPredicates.flatMap(_.references)) --
-      (projectSet ++ unhandledSet).map(relation.attributeMap)
+      try {
+        AttributeSet(handledPredicates.flatMap(_.references)) --
+        (projectSet ++ unhandledSet).map(relation.attributeMap)
+      } catch {
+        case e => throw new CarbonPhysicalPlanException
+      }
     }
-
     // Combines all Catalyst filter `Expression`s that are either not convertible to data source
     // `Filter`s or cannot be handled by `relation`.
     val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
@@ -309,6 +333,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
+        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -345,6 +370,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         scanBuilder,
         candidatePredicates,
         pushedFilters,
+        handledFilters,
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
@@ -359,9 +385,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       output: Seq[Attribute],
       partitions: Seq[PartitionSpec],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec])
+        => RDD[InternalRow],
       candidatePredicates: Seq[Expression],
-      pushedFilters: Seq[Filter],
+      pushedFilters: Seq[Filter], handledFilters: Seq[Filter],
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
@@ -380,19 +407,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         relation.catalogTable.map(_.identifier), relation)
     } else {
-      RowDataSourceScanExec(output,
-        scanBuilder(updateRequestedColumns,
-          candidatePredicates,
-          pushedFilters,
-          needDecoder,
-          partitions),
-        relation.relation,
-        getPartitioning(table.carbonTable, updateRequestedColumns),
-        metadata,
-        relation.catalogTable.map(_.identifier))
+      val partition = getPartitioning(table.carbonTable, updateRequestedColumns)
+      val rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
+        pushedFilters, needDecoder, partitions)
+      CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output,
+        pushedFilters, handledFilters,
+        rdd, partition, metadata)
     }
   }
 
+
   def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
       relation: CarbonDatasourceHadoopRelation,
       needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
@@ -471,7 +495,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   protected[sql] def selectFilters(
       relation: BaseRelation,
-      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Seq[Filter]) = {
 
     // In case of ComplexType dataTypes no filters should be pushed down. IsNotNull is being
     // explicitly added by spark and pushed. That also has to be handled and pushed back to
@@ -536,7 +560,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     // a filter to every row or not.
     val (_, translatedFilters) = translated.unzip
 
-    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters, handledFilters)
   }
 
   /**
@@ -701,3 +725,5 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 }
+
+class CarbonPhysicalPlanException extends Exception

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 0c9490d..4499b19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.strategy
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
@@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 
-/**
- * Carbon strategies for ddl commands
- */
+  /**
+   * Carbon strategies for ddl commands
+   * CreateDataSourceTableAsSelectCommand class has extra argument in
+   * 2.3, so need to add wrapper to match the case
+   */
+object MatchCreateDataSourceTable {
+  def unapply(plan: LogicalPlan): Option[(CatalogTable, SaveMode, LogicalPlan)] = plan match {
+    case t: CreateDataSourceTableAsSelectCommand => Some(t.table, t.mode, t.query)
+    case _ => None
+  }
+}
 
 class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
   val LOGGER: LogService =
@@ -231,7 +240,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val cmd =
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
-      case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+      case MatchCreateDataSourceTable(tableDesc, mode, query)
         if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
            && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
                || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 97c37df..62e2d85 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCom
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -74,9 +74,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
             "Update operation is not supported for table which has index datamaps")
         }
       }
-      val tableRelation = if (SPARK_VERSION.startsWith("2.1")) {
+      val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         relation
-      } else if (SPARK_VERSION.startsWith("2.2")) {
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -206,9 +206,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           }
         }
         // include tuple id in subquery
-        if (SPARK_VERSION.startsWith("2.1")) {
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
           Project(projList, relation)
-        } else if (SPARK_VERSION.startsWith("2.2")) {
+        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
           alias match {
             case Some(_) =>
               val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
@@ -277,14 +277,13 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
           case attr => attr
         }
       }
-      val version = SPARK_VERSION
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (version.startsWith("2.1")) {
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (version.startsWith("2.2")) {
+        } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
           CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
         } else {
-          throw new UnsupportedOperationException(s"Spark version $version is not supported")
+          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
         }
       } else {
         Project(newChildOutput, child)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 70e61bc..1840c5d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,7 +23,6 @@ import java.net.URI
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -43,9 +43,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
@@ -72,6 +71,13 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
+object MatchLogicalRelation {
+  def unapply(logicalPlan: LogicalPlan): Option[(BaseRelation, Any, Any)] = logicalPlan match {
+    case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable)
+    case _ => None
+  }
+}
+
 class CarbonFileMetastore extends CarbonMetaStore {
 
   @transient
@@ -143,13 +149,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.catalog.currentDatabase)
     val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
       case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      MatchLogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
+      case MatchLogicalRelation(
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if SPARK_VERSION.startsWith("2.2") &&
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
@@ -598,13 +604,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
     relation match {
       case SubqueryAlias(_,
-      LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+      MatchLogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
         carbonDataSourceHadoopRelation
-      case LogicalRelation(
+      case MatchLogicalRelation(
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if SPARK_VERSION.startsWith("2.2") &&
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
            (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index c59246d..76ff41a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -273,11 +273,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              AttributeReference(
+              CarbonToSparkAdapater.createAttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+                childAttr.metadata,
+                childAttr.exprId,
+                attr.qualifier,
+                attr)
             case None =>
               attr
           }
@@ -296,11 +299,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
             case Some((_, childAttr)) =>
-              AttributeReference(
+              CarbonToSparkAdapater.createAttributeReference(
                 childAttr.name,
                 childAttr.dataType,
                 childAttr.nullable,
-                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+                childAttr.metadata,
+                childAttr.exprId,
+                attr.qualifier,
+                attr)
             case None =>
               attr
           }
@@ -777,24 +783,36 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         val factAlias = factPlanExpForStreaming(name)
         // create attribute reference object for each expression
         val attrs = factAlias.map { factAlias =>
-          AttributeReference(
+          CarbonToSparkAdapater.createAttributeReference(
             name,
             alias.dataType,
-            alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            factAlias.exprId,
+            alias.qualifier,
+            alias)
         }
         // add aggregate function in Aggregate node added for handling streaming
         // to aggregate results from fact and aggregate table
         val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
         // same reference id will be used as it can be used by above nodes in the plan like
         // sort, project, join
-        Alias(
+        CarbonToSparkAdapater.createAliasRef(
           updatedAggExp.head,
-          name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated)
+          name,
+          alias.exprId,
+          alias.qualifier,
+          Option(alias.metadata),
+          Some(alias))
       case alias@Alias(expression, name) =>
-        AttributeReference(
+        CarbonToSparkAdapater.createAttributeReference(
           name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
     }
     updatedExp
   }
@@ -897,9 +915,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           case attr: AttributeReference =>
             newAggExp += attr
           case exp: Expression =>
-            newAggExp += Alias(
+            newAggExp += CarbonToSparkAdapater.createAliasRef(
               exp,
-              "dummy_" + counter)(NamedExpression.newExprId, None, None, false)
+              "dummy_" + counter,
+              NamedExpression.newExprId)
             counter = counter + 1
         }
       }
@@ -923,12 +942,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // get the new aggregate expression
         val newAggExp = getAggFunctionForFactStreaming(aggExp)
         val updatedExp = newAggExp.map { exp =>
-          Alias(exp,
-              name)(
-              NamedExpression.newExprId,
-              alias.qualifier,
+          CarbonToSparkAdapater.createAliasRef(exp,
+            name,
+            NamedExpression.newExprId,
+            alias.qualifier,
             Some(alias.metadata),
-              alias.isGenerated)
+            Some(alias))
         }
         // adding to map which will be used while Adding an Aggregate node for handling streaming
         // table plan change
@@ -936,10 +955,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         updatedExp
       case alias@Alias(exp: Expression, name) =>
         val newAlias = Seq(alias)
-        val attr = AttributeReference(name,
-            alias.dataType,
-            alias.nullable,
-            alias.metadata) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val attr = CarbonToSparkAdapater.createAttributeReference(name,
+          alias.dataType,
+          alias.nullable,
+          alias.metadata,
+          alias.exprId,
+          alias.qualifier,
+          alias)
         factPlanGrpExpForStreaming.put(
           AggExpToColumnMappingModel(
             removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))),
@@ -1093,11 +1115,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   private def removeQualifiers(expression: Expression) : Expression = {
     expression.transform {
       case attr: AttributeReference =>
-        AttributeReference(
+        CarbonToSparkAdapater.createAttributeReference(
           attr.name,
           attr.dataType,
           attr.nullable,
-          attr.metadata)(attr.exprId, None, attr.isGenerated)
+          attr.metadata,
+          attr.exprId,
+          None,
+          attr)
     }
   }
 
@@ -1363,10 +1388,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val childTableAttr = AttributeReference(attr.name,
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(attr.name,
           childAttr.dataType,
           childAttr.nullable,
-          childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
+          childAttr.metadata,
+          newExpressionId,
+          childAttr.qualifier,
+          attr)
         updatedExpression.put(attr, childTableAttr)
         // returning the alias to show proper column name in output
         Seq(Alias(childAttr,
@@ -1378,12 +1406,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           attr,
           attributes)
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = AttributeReference(name,
+        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
-        val childTableAttr = AttributeReference(name,
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          newExpressionId,
+          alias.qualifier,
+          alias)
         updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
         Seq(Alias(childAttr,
@@ -1409,13 +1445,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           val newExpressionId = NamedExpression.newExprId
           // create a parent attribute reference which will be replced on node which may be referred
           // by node like sort join
-          val parentTableAttr = AttributeReference(name,
+          val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
             alias.dataType,
-            alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            alias.exprId,
+            alias.qualifier,
+            alias)
           // creating a child attribute reference which will be replced
-          val childTableAttr = AttributeReference(name,
+          val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
             alias.dataType,
-            alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated)
+            alias.nullable,
+            Metadata.empty,
+            newExpressionId,
+            alias.qualifier,
+            alias)
           // adding to map, will be used during other node updation like sort, join, project
           updatedExpression.put(parentTableAttr, childTableAttr)
           // returning alias with child attribute reference
@@ -1426,12 +1470,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // for streaming table
           // create alias for aggregate table
           val aggExpForStreaming = aggExp.map{ exp =>
-            Alias(exp,
-              name)(
+            CarbonToSparkAdapater.createAliasRef(exp,
+              name,
               NamedExpression.newExprId,
               alias.qualifier,
               Some(alias.metadata),
-              alias.isGenerated).asInstanceOf[NamedExpression]
+              Some(alias)).asInstanceOf[NamedExpression]
           }
           aggExpForStreaming
         }
@@ -1460,12 +1504,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             }
           }
         val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = AttributeReference(name,
+        val parentTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
-        val childTableAttr = AttributeReference(name,
+          alias.nullable,
+          Metadata.empty,
+          alias.exprId,
+          alias.qualifier,
+          alias)
+        val childTableAttr = CarbonToSparkAdapater.createAttributeReference(name,
           alias.dataType,
-          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+          alias.nullable,
+          Metadata.empty,
+          newExpressionId,
+          alias.qualifier,
+          alias)
         updatedExpression.put(parentTableAttr, childTableAttr)
         Seq(Alias(updatedExp, name)(newExpressionId,
           alias.qualifier).asInstanceOf[NamedExpression])
@@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
                 namedExpressionList +=
-                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
+                CarbonToSparkAdapater.createAliasRef(expressions.head,
+                  name + "_ sum",
+                  NamedExpression.newExprId,
                   alias.qualifier,
                   Some(alias.metadata),
-                  alias.isGenerated)
+                  Some(alias))
                 validExpressionsMap += AggExpToColumnMappingModel(sumExp)
               }
               // check with same expression already count is present then do not add to
               // named expression list otherwise update the list and add it to set
               if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
                 namedExpressionList +=
-                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
-                  alias.qualifier,
-                  Some(alias.metadata),
-                  alias.isGenerated)
+                CarbonToSparkAdapater.createAliasRef(expressions.last, name + "_ count",
+                    NamedExpression.newExprId,
+                    alias.qualifier,
+                    Some(alias.metadata),
+                    Some(alias))
                 validExpressionsMap += AggExpToColumnMappingModel(countExp)
               }
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 450902a..b96b6a7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -49,6 +50,7 @@ import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit}
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 
+
 /**
  * All filter conversions are done here.
  */
@@ -297,6 +299,27 @@ object CarbonFilters {
     filters.flatMap(translate(_, false)).toArray
   }
 
+  /**
+   * This API checks whether StringTrim object is compatible with
+   * carbon,carbon only deals with the space any other symbol should
+   * be ignored.So condition is SPARK version < 2.3.
+   * If it is 2.3 then trimStr field should be empty
+   *
+   * @param stringTrim
+   * @return
+   */
+  def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
+    var isCompatible = true
+    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+        .asInstanceOf[Option[Expression]]
+      if (trimStr.isDefined) {
+        isCompatible = false
+      }
+    }
+    isCompatible
+  }
+
   def transformExpression(expr: Expression): CarbonExpression = {
     expr match {
       case Or(left, right)
@@ -385,7 +408,8 @@ object CarbonFilters {
           new CarbonLiteralExpression(maxValueLimit,
             CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)))
         new AndExpression(l, r)
-      case StringTrim(child) => transformExpression(child)
+      case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) =>
+        transformExpression(strTrim)
       case s: ScalaUDF =>
         new MatchExpression(s.children.head.toString())
       case _ =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index b81d0a1..48c6377 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -786,8 +786,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           p.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
-                a.explicitMetadata, a.isGenerated)
+              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+                a.name,
+                a.exprId,
+                a.qualifier,
+                a.explicitMetadata,
+                Some(a))
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)
@@ -800,8 +804,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           f.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
-                a.explicitMetadata, a.isGenerated)
+              CarbonToSparkAdapater.createAliasRef(CustomDeterministicExpression(exp),
+                a.name,
+                a.exprId,
+                a.qualifier,
+                a.explicitMetadata,
+                Some(a))
             case exp: NamedExpression
               if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
               CustomDeterministicExpression(exp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 8eb47fc..1622724 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonToSparkAdapater, DeleteRecords, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -479,7 +479,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         logicalPlan match {
           case _: CarbonCreateTableCommand =>
             ExplainCommand(logicalPlan, extended = isExtended.isDefined)
-          case _ => ExplainCommand(OneRowRelation)
+          case _ => CarbonToSparkAdapater.getExplainCommandObj
         }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..d5fe6a4
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7fa01e9..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.1 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.capacity();
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    public void resetDictionaryIds(int ordinal) {
-        columnarBatch.column(ordinal).getDictionaryIds().reset();
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        columnarBatch.reset();
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
-                }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
-                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
-            }
-        }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
-
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnarBatch.column(ordinal).isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return columnarBatch.column(ordinal).dataType();
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 989b1d5..dd690e4 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -34,11 +34,9 @@ import org.apache.spark.sql.types.StructType
 
 /**
  * Create table 'using carbondata' and insert the query result into it.
- *
  * @param table the Catalog Table
  * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
  * @param query the query whose result will be insert into the new relation
- *
  */
 
 case class CreateCarbonSourceTableAsSelectCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..7a68e3e
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq(OptimizeCodegen(conf))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 944b32e..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.2 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.capacity();
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    public Object reserveDictionaryIds(int capacity , int dummyOrdinal) {
-        return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
-    }
-
-    public void resetDictionaryIds(int ordinal) {
-        columnarBatch.column(ordinal).getDictionaryIds().reset();
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        columnarBatch.reset();
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
-                }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
-                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
-            }
-        }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnarBatch.column(ordinal).isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return columnarBatch.column(ordinal).dataType();
-    }
-}