You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/30 17:46:49 UTC

[3/6] incubator-carbondata git commit: add spark2 module

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
new file mode 100644
index 0000000..2468962
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
+
+class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    // create a new table using dataframe's schema and write its content into the table
+    sqlContext.sparkSession.sql(makeCreateTableString(dataFrame.schema,
+    new CarbonOption(parameters)))
+    writeToCarbonFile(parameters)
+  }
+
+  def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    writeToCarbonFile(parameters)
+  }
+
+  private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    val options = new CarbonOption(parameters)
+    if (options.tempCSV) {
+      loadTempCSV(options)
+    } else {
+      loadDataFrame(options)
+    }
+  }
+
+  /**
+   * Firstly, saving DataFrame to CSV files
+   * Secondly, load CSV files
+   * @param options
+   * @param sqlContext
+   */
+  private def loadTempCSV(options: CarbonOption): Unit = {
+    // temporary solution: write to csv file, then load the csv into carbon
+    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
+      .append("tempCSV")
+      .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
+      .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
+      .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
+    writeToTempCSVFile(tempCSVFolder, options)
+
+    val tempCSVPath = new Path(tempCSVFolder)
+    val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
+
+    def countSize(): Double = {
+      var size: Double = 0
+      val itor = fs.listFiles(tempCSVPath, true)
+      while (itor.hasNext) {
+        val f = itor.next()
+        if (f.getPath.getName.startsWith("part-")) {
+          size += f.getLen
+        }
+      }
+      size
+    }
+
+    LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
+
+    try {
+      sqlContext.sql(makeLoadString(tempCSVFolder, options))
+    } finally {
+      fs.delete(tempCSVPath, true)
+    }
+  }
+
+  private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
+    var writer: DataFrameWriter[Row] =
+      dataFrame.write
+        .format(csvPackage)
+        .option("header", "false")
+        .mode(SaveMode.Overwrite)
+
+    if (options.compress) {
+      writer = writer.option("codec", "gzip")
+    }
+
+    writer.save(tempCSVFolder)
+  }
+
+  /**
+   * Loading DataFrame directly without saving DataFrame to CSV files.
+   * @param options
+   */
+  private def loadDataFrame(options: CarbonOption): Unit = {
+    val header = dataFrame.columns.mkString(",")
+    LoadTable(
+      Some(options.dbName),
+      options.tableName,
+      null,
+      Seq(),
+      Map("fileheader" -> header),
+      isOverwriteExist = false,
+      null,
+      Some(dataFrame)).run(sqlContext.sparkSession)
+  }
+
+  private def csvPackage: String = "com.databricks.spark.csv.newapi"
+
+  private def convertToCarbonType(sparkType: DataType): String = {
+    sparkType match {
+      case StringType => CarbonType.STRING.getName
+      case IntegerType => CarbonType.INT.getName
+      case ByteType => CarbonType.INT.getName
+      case ShortType => CarbonType.SHORT.getName
+      case LongType => CarbonType.LONG.getName
+      case FloatType => CarbonType.DOUBLE.getName
+      case DoubleType => CarbonType.DOUBLE.getName
+      case BooleanType => CarbonType.DOUBLE.getName
+      case TimestampType => CarbonType.TIMESTAMP.getName
+      case other => sys.error(s"unsupported type: $other")
+    }
+  }
+
+  private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
+    val carbonSchema = schema.map { field =>
+      s"${ field.name } ${ convertToCarbonType(field.dataType) }"
+    }
+    s"""
+          CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+          (${ carbonSchema.mkString(", ") })
+          using 'org.apache.spark.sql.CarbonRelationProvider'
+      """
+  }
+
+  private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
+    if (options.useKettle) {
+      s"""
+          LOAD DATA INPATH '$csvFolder'
+          INTO TABLE ${options.dbName}.${options.tableName}
+          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
+      """
+    } else {
+      s"""
+          LOAD DATA INPATH '$csvFolder'
+          INTO TABLE ${options.dbName}.${options.tableName}
+          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}', 'USE_KETTLE' = 'false')
+      """
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
new file mode 100644
index 0000000..24182ec
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.scan.expression.logical.AndExpression
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+case class CarbonDatasourceHadoopRelation(
+    sparkSession: SparkSession,
+    paths: Array[String],
+    parameters: Map[String, String],
+    tableSchema: Option[StructType])
+  extends BaseRelation with PrunedFilteredScan {
+
+  lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+  lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
+  lazy val carbonRelation: CarbonRelation = {
+    CarbonRelation(
+      carbonTable.getDatabaseName,
+      carbonTable.getFactTableName,
+      CarbonSparkUtil.createSparkMeta(carbonTable),
+      new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
+      None
+    )
+  }
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val job = new Job(new JobConf())
+    val conf = new Configuration(job.getConfiguration)
+    val filterExpression: Option[Expression] = filters.flatMap { filter =>
+      CarbonFilters.createCarbonFilter(schema, filter)
+    }.reduceOption(new AndExpression(_, _))
+
+    val projection = new CarbonProjection
+    requiredColumns.foreach(projection.addColumn)
+    CarbonInputFormat.setColumnProjection(conf, projection)
+    CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
+
+    new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
+      absIdentifier, carbonTable)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
new file mode 100644
index 0000000..d05aefd
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors.attachTree
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.hive.{CarbonMetastoreTypes, CarbonRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.querystatistics._
+import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
+/**
+ * It decodes the data.
+ *
+ */
+case class CarbonDictionaryDecoder(
+    relations: Seq[CarbonDecoderRelation],
+    profile: CarbonProfile,
+    aliasMap: CarbonAliasDecoderRelation,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override val output: Seq[Attribute] = {
+    child.output.map { a =>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if(relation.isDefined && canBeDecoded(attr)) {
+        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension = carbonTable
+          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          val newAttr = AttributeReference(a.name,
+            convertCarbonToSparkDataType(carbonDimension,
+              relation.get.carbonRelation.carbonRelation),
+            a.nullable,
+            a.metadata)(a.exprId).asInstanceOf[Attribute]
+          newAttr
+        } else {
+          a
+        }
+      } else {
+        a
+      }
+    }
+  }
+
+
+  def canBeDecoded(attr: Attribute): Boolean = {
+    profile match {
+      case ip: IncludeProfile if ip.attributes.nonEmpty =>
+        ip.attributes
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+      case ep: ExcludeProfile =>
+        !ep.attributes
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+      case _ => true
+    }
+  }
+
+  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+      relation: CarbonRelation): types.DataType = {
+    carbonDimension.getDataType match {
+      case DataType.STRING => StringType
+      case DataType.SHORT => ShortType
+      case DataType.INT => IntegerType
+      case DataType.LONG => LongType
+      case DataType.DOUBLE => DoubleType
+      case DataType.BOOLEAN => BooleanType
+      case DataType.DECIMAL =>
+        val scale: Int = carbonDimension.getColumnSchema.getScale
+        val precision: Int = carbonDimension.getColumnSchema.getPrecision
+        if (scale == 0 && precision == 0) {
+          DecimalType(18, 2)
+        } else {
+          DecimalType(precision, scale)
+        }
+      case DataType.TIMESTAMP => TimestampType
+      case DataType.STRUCT =>
+        CarbonMetastoreTypes
+          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+      case DataType.ARRAY =>
+        CarbonMetastoreTypes
+          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+    }
+  }
+
+  val getDictionaryColumnIds = {
+    val attributes = child.output
+    val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if(relation.isDefined && canBeDecoded(attr)) {
+        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension =
+          carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+            carbonDimension.getDataType)
+        } else {
+          (null, null, null)
+        }
+      } else {
+        (null, null, null)
+      }
+
+    }.toArray
+    dictIds
+  }
+
+  override def doExecute(): RDD[InternalRow] = {
+    attachTree(this, "execute") {
+      val storePath = CarbonEnv.get.carbonMetastore.storePath
+      val absoluteTableIdentifiers = relations.map { relation =>
+        val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
+        (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+      }.toMap
+
+      if (isRequiredToDecode) {
+        val dataTypes = child.output.map { attr => attr.dataType }
+        child.execute().mapPartitions { iter =>
+          val cacheProvider: CacheProvider = CacheProvider.getInstance
+          val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+            cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+          val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+            forwardDictionaryCache)
+          val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
+          // add a task completion listener to clear dictionary that is a decisive factor for
+          // LRU eviction policy
+          val dictionaryTaskCleaner = TaskContext.get
+          dictionaryTaskCleaner.addTaskCompletionListener(context =>
+            dicts.foreach { dictionary =>
+              if (null != dictionary) {
+                dictionary.clear
+              }
+            }
+          )
+          new Iterator[InternalRow] {
+            val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+            var flag = true
+            var total = 0L
+            override final def hasNext: Boolean = iter.hasNext
+            override final def next(): InternalRow = {
+              val startTime = System.currentTimeMillis()
+              val row: InternalRow = iter.next()
+              val data = row.toSeq(dataTypes).toArray
+              dictIndex.foreach { index =>
+                if (data(index) != null) {
+                  data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+                    .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                    getDictionaryColumnIds(index)._3)
+                }
+              }
+              val result = unsafeProjection(new GenericMutableRow(data))
+              total += System.currentTimeMillis() - startTime
+              result
+            }
+          }
+        }
+      } else {
+        child.execute()
+      }
+    }
+  }
+
+  private def isRequiredToDecode = {
+    getDictionaryColumnIds.find(p => p._1 != null) match {
+      case Some(value) => true
+      case _ => false
+    }
+  }
+
+  private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
+      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+    val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
+      if (f._2 != null) {
+        try {
+          cache.get(new DictionaryColumnUniqueIdentifier(
+            atiMap(f._1).getCarbonTableIdentifier,
+            f._2, f._3))
+        } catch {
+          case _: Throwable => null
+        }
+      } else {
+        null
+      }
+    }
+    dicts
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
new file mode 100644
index 0000000..8028908
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.sql.hive.{CarbonMetastore, DistributionUtil}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Carbon Environment for unified context
+ */
+case class CarbonEnv(carbonMetastore: CarbonMetastore)
+
+object CarbonEnv extends Logging {
+
+  @volatile private var carbonEnv: CarbonEnv = _
+
+  var initialized = false
+
+  def init(sqlContext: SQLContext): Unit = {
+    if (!initialized) {
+      val catalog = {
+        val storePath = sqlContext.sparkSession.conf.get(
+        CarbonCommonConstants.STORE_LOCATION, "/user/hive/warehouse/carbonstore")
+        new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+      }
+      carbonEnv = CarbonEnv(catalog)
+      DistributionUtil.numExistingExecutors = sqlContext.sparkContext.schedulerBackend match {
+        case b: CoarseGrainedSchedulerBackend => b.getExecutorIds().length
+        case _ => 0
+      }
+      initialized = true
+    }
+  }
+
+  def get: CarbonEnv = {
+    carbonEnv
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index cb0b9a5..9e42b44 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.hive.CarbonRelation
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.scan.model._
 import org.apache.carbondata.spark.CarbonFilters
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.hive.CarbonRelation
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 case class CarbonScan(
     var attributesRaw: Seq[Attribute],

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
new file mode 100644
index 0000000..fb87ba2
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.language.implicitConversions
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.{CreateTable, Field}
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
+
+/**
+ * Carbon relation provider compliant to data source api.
+ * Creates carbon relations
+ */
+class CarbonSource extends CreatableRelationProvider
+    with SchemaRelationProvider with DataSourceRegister {
+
+  override def shortName(): String = "carbondata"
+
+  // called by any write operation like INSERT INTO DDL or DataFrame.write API
+  override def createRelation(
+                               sqlContext: SQLContext,
+                               mode: SaveMode,
+                               parameters: Map[String, String],
+                               data: DataFrame): BaseRelation = {
+    CarbonEnv.init(sqlContext)
+    // User should not specify path since only one store is supported in carbon currently,
+    // after we support multi-store, we can remove this limitation
+    require(!parameters.contains("path"), "'path' should not be specified, " +
+        "the path to store carbon file is the 'storePath' specified when creating CarbonContext")
+
+    val options = new CarbonOption(parameters)
+    val storePath = sqlContext.sparkSession.conf.get(CarbonCommonConstants.STORE_LOCATION)
+    val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
+    val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        .exists(tablePath)
+    val (doSave, doAppend) = (mode, isExists) match {
+      case (SaveMode.ErrorIfExists, true) =>
+        sys.error(s"ErrorIfExists mode, path $storePath already exists.")
+      case (SaveMode.Overwrite, true) =>
+        sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}")
+        (true, false)
+      case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
+        (true, false)
+      case (SaveMode.Append, _) =>
+        (false, true)
+      case (SaveMode.Ignore, exists) =>
+        (!exists, false)
+    }
+
+    if (doSave) {
+      // save data when the save mode is Overwrite.
+      new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(parameters)
+    } else if (doAppend) {
+      new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(parameters)
+    }
+
+    createRelation(sqlContext, parameters, data.schema)
+  }
+
+  // called by DDL operation with a USING clause
+  override def createRelation(
+                               sqlContext: SQLContext,
+                               parameters: Map[String, String],
+                               dataSchema: StructType): BaseRelation = {
+    CarbonEnv.init(sqlContext)
+    addLateDecodeOptimization(sqlContext.sparkSession)
+    val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
+    CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
+      Option(dataSchema))
+
+  }
+
+  private def addLateDecodeOptimization(ss: SparkSession): Unit = {
+    ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+    ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+  }
+
+  private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
+                                     dataSchema: StructType): String = {
+    val (dbName, tableName) = parameters.get("path") match {
+      case Some(path) =>
+        val p = path.split(File.separator)
+        ("default", p(p.length - 1))
+      case _ => throw new Exception("do not have dbname and tablename for carbon table")
+    }
+    try {
+      CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+    } catch {
+      case ex: NoSuchTableException =>
+        val fields = dataSchema.map { col =>
+          val column = col.name
+          val dataType = Option(col.dataType.toString)
+          val name = Option(col.name)
+          // This is to parse complex data types
+          val x = col.name + ' ' + col.dataType
+          val f: Field = Field(column, dataType, name, None, null)
+          // the data type of the decimal type will be like decimal(10,0)
+          // so checking the start of the string and taking the precision and scale.
+          // resetting the data type with decimal
+          if (f.dataType.getOrElse("").startsWith("decimal")) {
+            val (precision, scale) = TableCreator.getScaleAndPrecision(col.dataType.toString)
+            f.precision = precision
+            f.scale = scale
+            f.dataType = Some("decimal")
+          }
+          f
+        }
+        val map = scala.collection.mutable.Map[String, String]();
+        parameters.foreach { x => map.put(x._1, x._2) }
+        val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
+        CreateTable(cm).run(sparkSession)
+        CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+      case _ => throw new Exception("do not have dbname and tablename for carbon table")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
new file mode 100644
index 0000000..284af3d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{ArrayList, List}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
+
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
+import org.apache.carbondata.scan.expression.conditional.ConditionalExpression
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException
+import org.apache.carbondata.scan.filter.intf.{ExpressionType, RowIntf}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+class SparkUnknownExpression(var sparkExp: SparkExpression)
+  extends UnknownExpression with ConditionalExpression {
+
+  private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+  private var isExecutor: Boolean = false
+  children.addAll(getColumnList())
+
+  override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
+
+    val values = carbonRowInstance.getValues.toSeq.map {
+      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+      case d: java.math.BigDecimal =>
+        val javaDecVal = new java.math.BigDecimal(d.toString)
+        val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
+        val decConverter = new org.apache.spark.sql.types.Decimal()
+        decConverter.set(scalaDecVal)
+      case value => value
+    }
+    try {
+      val result = evaluateExpression(
+        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+      val sparkRes = if (isExecutor) {
+        result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
+      } else {
+        result
+      }
+      new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
+        sparkRes
+      )
+    } catch {
+      case e: Exception => throw new FilterUnsupportedException(e.getMessage)
+    }
+  }
+
+  override def getFilterExpressionType: ExpressionType = {
+    ExpressionType.UNKNOWN
+  }
+
+  override def getString: String = {
+    sparkExp.toString()
+  }
+
+  def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
+    this.evaluateExpression = evaluateExpression
+    isExecutor = true
+  }
+
+  def getColumnList: java.util.List[ColumnExpression] = {
+
+    val lst = new java.util.ArrayList[ColumnExpression]()
+    getColumnListFromExpressionTree(sparkExp, lst)
+    lst
+  }
+  def getLiterals: java.util.List[ExpressionResult] = {
+
+    val lst = new java.util.ArrayList[ExpressionResult]()
+    lst
+  }
+
+  def getAllColumnList: java.util.List[ColumnExpression] = {
+    val lst = new java.util.ArrayList[ColumnExpression]()
+    getAllColumnListFromExpressionTree(sparkExp, lst)
+    lst
+  }
+
+  def isSingleDimension: Boolean = {
+    val lst = new java.util.ArrayList[ColumnExpression]()
+    getAllColumnListFromExpressionTree(sparkExp, lst)
+    if (lst.size == 1 && lst.get(0).isDimension) {
+      true
+    } else {
+      false
+    }
+  }
+
+  def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
+      list: java.util.List[ColumnExpression]): Unit = {
+    sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
+  }
+
+
+  def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
+      list: List[ColumnExpression]): List[ColumnExpression] = {
+    sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
+    list
+  }
+
+  def isDirectDictionaryColumns: Boolean = {
+    val lst = new ArrayList[ColumnExpression]()
+    getAllColumnListFromExpressionTree(sparkExp, lst)
+    if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      true
+    } else {
+      false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
new file mode 100644
index 0000000..14decdb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.regex.{Matcher, Pattern}
+
+import scala.collection.mutable.{LinkedHashSet, Map}
+
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField, TableModel}
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+object TableCreator {
+
+  // detects whether complex dimension is part of dictionary_exclude
+  def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
+    val dimensionType = Array("array", "struct")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
+  }
+
+  // detects whether double or decimal column is part of dictionary_exclude
+  def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
+    val dataTypes = Array("string", "timestamp")
+    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+  }
+
+  // detect dimention data type
+  def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
+    val dimensionType =
+      Array("string", "stringtype", "array", "arraytype", "struct",
+        "structtype", "timestamp", "timestamptype")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
+  }
+
+  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
+                                                    tableProperties: Map[String, String]):
+  (Seq[Field], Seq[String]) = {
+    var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var dictExcludeCols: Array[String] = Array[String]()
+    var noDictionaryDims: Seq[String] = Seq[String]()
+    var dictIncludeCols: Seq[String] = Seq[String]()
+
+    // All excluded cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+      dictExcludeCols
+        .map { dictExcludeCol =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
+            val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
+              " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          } else {
+            val dataType = fields.find(x =>
+              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
+            if (isComplexDimDictionaryExclude(dataType)) {
+              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
+                dictExcludeCol
+              throw new MalformedCarbonCommandException(errormsg)
+            } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
+              val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
+                " data type column: " + dictExcludeCol
+              throw new MalformedCarbonCommandException(errorMsg)
+            }
+          }
+        }
+    }
+    // All included cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
+      dictIncludeCols.map { distIncludeCol =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+
+    // include cols should contain exclude cols
+    dictExcludeCols.foreach { dicExcludeCol =>
+      if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
+        val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
+          " with DICTIONARY_INCLUDE. Please check create table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+
+    // by default consider all String cols as dims and if any dictionary exclude is present then
+    // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
+    fields.foreach(field => {
+
+      if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
+        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP) {
+          noDictionaryDims :+= field.column
+        }
+        dimFields += field
+      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        dimFields += (field)
+      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+        dimFields += (field)
+      }
+    }
+    )
+
+    (dimFields.toSeq, noDictionaryDims)
+  }
+
+  /**
+   * Extract the Measure Cols fields. By default all non string cols will be measures.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractMsrColsFromFields(fields: Seq[Field],
+                                         tableProperties: Map[String, String]): Seq[Field] = {
+    var msrFields: Seq[Field] = Seq[Field]()
+    var dictIncludedCols: Array[String] = Array[String]()
+    var dictExcludedCols: Array[String] = Array[String]()
+
+    // get all included cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+    }
+
+    // get all excluded cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+    }
+
+    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
+    fields.foreach(field => {
+      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
+        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+          !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
+          msrFields :+= field
+        }
+      }
+    })
+
+    msrFields
+  }
+
+  def getKey(parentColumnName: Option[String],
+             columnName: String): (String, String) = {
+    if (parentColumnName.isDefined) {
+      if (columnName == "val") {
+        (parentColumnName.get, parentColumnName.get + "." + columnName)
+      } else {
+        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+      }
+    } else {
+      (columnName, columnName)
+    }
+  }
+
+  protected def fillColumnProperty(
+      parentColumnName: Option[String],
+      columnName: String,
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+    if (colProps.isDefined) {
+      colPropMap.put(colProKey, colProps.get)
+    }
+  }
+
+  protected def fillAllChildrenColumnProperty(
+      parent: String,
+      fieldChildren: Option[List[Field]],
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    fieldChildren.foreach { fields =>
+      fields.foreach { field =>
+        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+      }
+    }
+  }
+
+  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+  java.util.Map[String, java.util.List[ColumnProperty]] = {
+    val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+      } else {
+        fillColumnProperty(None, field.column, tableProperties, colPropMap)
+      }
+    }
+    colPropMap
+  }
+
+  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
+    // if columns in column group is not in schema order than arrange it in schema order
+    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
+    colGroup.split(',').map(_.trim).foreach { x =>
+      dims.zipWithIndex.foreach { dim =>
+        if (dim._1.column.equalsIgnoreCase(x)) {
+          colGrpFieldIndx :+= dim._2
+        }
+      }
+    }
+    // sort it
+    colGrpFieldIndx = colGrpFieldIndx.sorted
+    // check if columns in column group is in schema order
+    if (!checkIfInSequence(colGrpFieldIndx)) {
+      throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
+    }
+    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
+      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
+        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
+          throw new MalformedCarbonCommandException(
+            "Invalid column group,column in group should be contiguous as per schema.")
+        }
+      }
+      true
+    }
+    val colGrpNames: StringBuilder = StringBuilder.newBuilder
+    for (i <- colGrpFieldIndx.indices) {
+      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
+      if (i < (colGrpFieldIndx.length - 1)) {
+        colGrpNames.append(",")
+      }
+    }
+    colGrpNames.toString()
+  }
+
+  /**
+   * Extract the column groups configuration from table properties.
+   * Based on this Row groups of fields will be determined.
+   *
+   * @param tableProperties
+   * @return
+   */
+  protected def updateColumnGroupsInField(tableProperties: Map[String, String],
+                                          noDictionaryDims: Seq[String],
+                                          msrs: Seq[Field],
+                                          dims: Seq[Field]): Seq[String] = {
+    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
+
+      var splittedColGrps: Seq[String] = Seq[String]()
+      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+
+      // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
+      // here first splitting the value by () . so that the above will be splitted into 2 strings.
+      // [col1,col2] [col3,col4]
+      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
+      while (m.find()) {
+        val oneGroup: String = m.group(1)
+        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
+        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
+        splittedColGrps :+= arrangedColGrp
+      }
+      // This will  be furthur handled.
+      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
+    } else {
+      null
+    }
+  }
+
+  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
+    var complexDimensions: Seq[Field] = Seq()
+    var dimensions: Seq[Field] = Seq()
+    dims.foreach { dimension =>
+      dimension.dataType.getOrElse("NIL") match {
+        case "Array" => complexDimensions = complexDimensions :+ dimension
+        case "Struct" => complexDimensions = complexDimensions :+ dimension
+        case _ => dimensions = dimensions :+ dimension
+      }
+    }
+    dimensions ++ complexDimensions
+  }
+
+  /**
+   * This will extract the no inverted columns fields.
+   * By default all dimensions use inverted index.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractNoInvertedIndexColumns(fields: Seq[Field],
+                                              tableProperties: Map[String, String]):
+  Seq[String] = {
+    // check whether the column name is in fields
+    var noInvertedIdxColsProps: Array[String] = Array[String]()
+    var noInvertedIdxCols: Seq[String] = Seq[String]()
+
+    if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
+      noInvertedIdxColsProps =
+        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
+      noInvertedIdxColsProps
+        .map { noInvertedIdxColProp =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+              " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+    }
+    // check duplicate columns and only 1 col left
+    val distinctCols = noInvertedIdxColsProps.toSet
+    // extract the no inverted index columns
+    fields.foreach(field => {
+      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        noInvertedIdxCols :+= field.column
+      }
+    }
+    )
+    noInvertedIdxCols
+  }
+
+  private def normalizeType(field: Field): Field = {
+    val dataType = field.dataType.getOrElse("NIL")
+    dataType match {
+      case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
+        field.storeType
+      )
+      case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
+        field.parent, field.storeType
+      )
+      case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
+        field.storeType
+      )
+      case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
+        field.storeType
+      )
+      case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
+        field.parent, field.storeType
+      )
+      case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
+        field.storeType
+      )
+      case "array" => Field(field.column, Some("Array"), field.name,
+        field.children.map(f => f.map(normalizeType(_))),
+        field.parent, field.storeType
+      )
+      case "struct" => Field(field.column, Some("Struct"), field.name,
+        field.children.map(f => f.map(normalizeType(_))),
+        field.parent, field.storeType
+      )
+      case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
+        field.storeType
+      )
+      case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
+        field.storeType, field.precision, field.scale
+      )
+      // checking if the nested data type contains the child type as decimal(10,0),
+      // if it is present then extracting the precision and scale. resetting the data type
+      // with Decimal.
+      case _ if (dataType.startsWith("decimal")) =>
+        val (precision, scale) = getScaleAndPrecision(dataType)
+        Field(field.column,
+          Some("Decimal"),
+          field.name,
+          Some(null),
+          field.parent,
+          field.storeType, precision,
+          scale
+        )
+      case _ =>
+        field
+    }
+  }
+
+  private def appendParentForEachChild(field: Field, parentName: String): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "String" => Field(parentName + "." + field.column, Some("String"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Long" => Field(parentName + "." + field.column, Some("Long"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Double" => Field(parentName + "." + field.column, Some("Double"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Array" => Field(parentName + "." + field.column, Some("Array"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
+        field.storeType, field.precision, field.scale)
+      case _ => field
+    }
+  }
+
+  private def addParent(field: Field): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "Array" => Field(field.column, Some("Array"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType)
+      case "Struct" => Field(field.column, Some("Struct"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType)
+      case _ => field
+    }
+  }
+
+  def getScaleAndPrecision(dataType: String): (Int, Int) = {
+    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+    m.find()
+    val matchedString: String = m.group(1)
+    val scaleAndPrecision = matchedString.split(",")
+    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+  }
+
+  def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
+                        , tableName: String, fields: Seq[Field],
+                        partitionCols: Seq[PartitionerField],
+                        tableProperties: Map[String, String]): TableModel
+  = {
+
+    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+      fields, tableProperties)
+    if (dims.isEmpty) {
+      throw new MalformedCarbonCommandException(s"Table ${
+        dbName.getOrElse(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+      }.$tableName"
+        +
+        " can not be created without key columns. Please " +
+        "use DICTIONARY_INCLUDE or " +
+        "DICTIONARY_EXCLUDE to set at least one key " +
+        "column " +
+        "if all specified columns are numeric types")
+    }
+    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+
+    // column properties
+    val colProps = extractColumnProperties(fields, tableProperties)
+    // get column groups configuration from table properties.
+    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
+      noDictionaryDims, msrs, dims)
+
+    // get no inverted index columns from table properties.
+    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+
+    // validate the tableBlockSize from table properties
+    CommonUtil.validateTableBlockSize(tableProperties)
+
+    TableModel(ifNotExistPresent,
+      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+      dbName,
+      tableName,
+      tableProperties,
+      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+      msrs.map(f => normalizeType(f)),
+      Option(noDictionaryDims),
+      Option(noInvertedIdxCols),
+      groupCols,
+      Some(colProps))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index c2e3915..4ae8d61 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,114 +17,24 @@
 
 package org.apache.spark.sql.execution
 
-import scala.collection.JavaConverters._
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
-import org.apache.spark.sql._
+import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, CarbonDictionaryDecoder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-import org.apache.spark.sql.types.IntegerType
 
-///**
-// * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
-// * can improve the aggregation performance and reduce memory usage
-// */
-//private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
-//  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-//    plan match {
-//      case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-//        CarbonDictionaryDecoder(relations,
-//          profile,
-//          aliasMap,
-//          planLater(child)
-//        ) :: Nil
-//      case _ => Nil
-//    }
-//  }
-//    /**
-//      * Create carbon scan
-//     */
-//  private def carbonRawScan(projectList: Seq[NamedExpression],
-//      predicates: Seq[Expression],
-//      logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-//
-//    val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-//    val tableName: String =
-//      relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-//    // Check out any expressions are there in project list. if they are present then we need to
-//    // decode them as well.
-//    val projectSet = AttributeSet(projectList.flatMap(_.references))
-//    val scan = CarbonScan(projectSet.toSeq, relation.carbonRelation, predicates)
-//    projectList.map {
-//      case attr: AttributeReference =>
-//      case Alias(attr: AttributeReference, _) =>
-//      case others =>
-//        others.references.map{f =>
-//          val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name)
-//          if (dictionary.isDefined && dictionary.get) {
-//            scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
-//          }
-//        }
-//    }
-//    if (scan.attributesNeedToDecode.size() > 0) {
-//      val decoder = getCarbonDecoder(logicalRelation,
-//        sc,
-//        tableName,
-//        scan.attributesNeedToDecode.asScala.toSeq,
-//        scan)
-//      if (scan.unprocessedExprs.nonEmpty) {
-//        val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-//        ProjectExec(projectList, filterCondToAdd.map(FilterExec(_, decoder)).getOrElse(decoder))
-//      } else {
-//        ProjectExec(projectList, decoder)
-//      }
-//    } else {
-//      ProjectExec(projectList, scan)
-//    }
-//  }
-//
-//  def getCarbonDecoder(logicalRelation: LogicalRelation,
-//      sc: SQLContext,
-//      tableName: String,
-//      projectExprsNeedToDecode: Seq[Attribute],
-//      scan: CarbonScan): CarbonDictionaryDecoder = {
-//    val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
-//      logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
-//    val attrs = projectExprsNeedToDecode.map { attr =>
-//      val newAttr = AttributeReference(attr.name,
-//        attr.dataType,
-//        attr.nullable,
-//        attr.metadata)(attr.exprId)
-//      relation.addAttribute(newAttr)
-//      newAttr
-//    }
-//    CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
-//      CarbonAliasDecoderRelation(), scan)(sc)
-//  }
-//
-//  def isDictionaryEncoded(projectExprsNeedToDecode: Seq[Attribute],
-//      relation: CarbonDatasourceHadoopRelation): Boolean = {
-//    var isEncoded = false
-//    projectExprsNeedToDecode.foreach { attr =>
-//      if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false)) {
-//        isEncoded = true
-//      }
-//    }
-//    isEncoded
-//  }
-//
-//  def updateDataType(attr: AttributeReference,
-//      relation: CarbonDatasourceHadoopRelation,
-//      allAttrsNotDecode: java.util.Set[Attribute]): AttributeReference = {
-//    if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
-//        !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
-//      AttributeReference(attr.name,
-//        IntegerType,
-//        attr.nullable,
-//        attr.metadata)(attr.exprId, attr.qualifiers)
-//    } else {
-//      attr
-//    }
-//  }
-//}
+/**
+ * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
+ * can improve the aggregation performance and reduce memory usage
+ */
+private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
+        CarbonDictionaryDecoder(relations,
+          profile,
+          aliasMap,
+          planLater(child)
+        ) :: Nil
+      case _ => Nil
+    }
+  }
+
+}