You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:46 UTC

[08/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 0000000..9360ad8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.SQLContext
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.CarbonSparkFactory
+import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
+
+case class TableModel(
+    ifNotExistsSet: Boolean,
+    var databaseName: String,
+    databaseNameOp: Option[String],
+    tableName: String,
+    tableProperties: Map[String, String],
+    dimCols: Seq[Field],
+    msrCols: Seq[Field],
+    highcardinalitydims: Option[Seq[String]],
+    noInvertedIdxCols: Option[Seq[String]],
+    columnGroups: Seq[String],
+    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
+
+case class Field(column: String, var dataType: Option[String], name: Option[String],
+    children: Option[List[Field]], parent: String = null,
+    storeType: Option[String] = Some("columnar"),
+    var precision: Int = 0, var scale: Int = 0)
+
+case class ColumnProperty(key: String, value: String)
+
+case class ComplexField(complexType: String, primitiveField: Option[Field],
+    complexField: Option[ComplexField])
+
+case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int,
+    nodeList: Array[String])
+
+case class PartitionerField(partitionColumn: String, dataType: Option[String],
+    columnComment: String)
+
+case class DataLoadTableFileMapping(table: String, loadPath: String)
+
+case class CarbonMergerMapping(storeLocation: String,
+    storePath: String,
+    metadataFilePath: String,
+    mergedLoadName: String,
+    kettleHomePath: String,
+    tableCreationTime: Long,
+    databaseName: String,
+    factTableName: String,
+    validSegments: Array[String],
+    tableId: String,
+    // maxSegmentColCardinality is Cardinality of last segment of compaction
+    var maxSegmentColCardinality: Array[Int],
+    // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
+    var maxSegmentColumnSchemaList: List[ColumnSchema])
+
+case class NodeInfo(TaskId: String, noOfBlocks: Int)
+
+case class AlterTableModel(dbName: Option[String], tableName: String,
+    compactionType: String, alterSql: String)
+
+case class CompactionModel(compactionSize: Long,
+    compactionType: CompactionType,
+    carbonTable: CarbonTable,
+    tableCreationTime: Long,
+    isDDLTrigger: Boolean)
+
+case class CompactionCallableModel(storePath: String,
+    carbonLoadModel: CarbonLoadModel,
+    storeLocation: String,
+    carbonTable: CarbonTable,
+    kettleHomePath: String,
+    cubeCreationTime: Long,
+    loadsToMerge: util.List[LoadMetadataDetails],
+    sqlContext: SQLContext,
+    compactionType: CompactionType)
+
+object TableNewProcessor {
+  def apply(cm: TableModel, sqlContext: SQLContext): TableInfo = {
+    new TableNewProcessor(cm, sqlContext).process
+  }
+}
+
+class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
+
+  var index = 0
+  var rowGroup = 0
+
+  def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
+    var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
+    fieldChildren.foreach(fields => {
+      fields.foreach(field => {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        val columnSchema: ColumnSchema = getColumnSchema(
+          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column), index,
+          isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale)
+        allColumns ++= Seq(columnSchema)
+        index = index + 1
+        rowGroup = rowGroup + 1
+        if (field.children.get != null) {
+          columnSchema.setNumberOfChild(field.children.get.size)
+          allColumns ++= getAllChildren(field.children)
+        }
+      })
+    })
+    allColumns
+  }
+
+  def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
+      encoders: java.util.List[Encoding], isDimensionCol: Boolean,
+      colGroup: Integer, precision: Integer, scale: Integer): ColumnSchema = {
+    val columnSchema = new ColumnSchema()
+    columnSchema.setDataType(dataType)
+    columnSchema.setColumnName(colName)
+    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+    if (highCardinalityDims.contains(colName)) {
+      encoders.remove(encoders.remove(Encoding.DICTIONARY))
+    }
+    if (dataType == DataType.TIMESTAMP) {
+      encoders.add(Encoding.DIRECT_DICTIONARY)
+    }
+    val colPropMap = new java.util.HashMap[String, String]()
+    if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
+      val colProps = cm.colProps.get.get(colName)
+      colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
+    }
+    columnSchema.setColumnProperties(colPropMap)
+    columnSchema.setEncodingList(encoders)
+    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
+      columnSchema)
+    columnSchema.setColumnUniqueId(columnUniqueId)
+    columnSchema.setColumnReferenceId(columnUniqueId)
+    columnSchema.setColumnar(isCol)
+    columnSchema.setDimensionColumn(isDimensionCol)
+    columnSchema.setColumnGroup(colGroup)
+    columnSchema.setPrecision(precision)
+    columnSchema.setScale(scale)
+    // TODO: Need to fill RowGroupID, converted type
+    // & Number of Children after DDL finalization
+    columnSchema
+  }
+
+  // process create dml fields and create wrapper TableInfo object
+  def process: TableInfo = {
+    val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
+    var allColumns = Seq[ColumnSchema]()
+    var index = 0
+    cm.dimCols.foreach(field => {
+      val encoders = new java.util.ArrayList[Encoding]()
+      encoders.add(Encoding.DICTIONARY)
+      val columnSchema: ColumnSchema = getColumnSchema(
+        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
+        index,
+        isCol = true,
+        encoders,
+        isDimensionCol = true,
+        -1,
+        field.precision,
+        field.scale)
+      allColumns ++= Seq(columnSchema)
+      index = index + 1
+      if (field.children.isDefined && field.children.get != null) {
+        columnSchema.setNumberOfChild(field.children.get.size)
+        allColumns ++= getAllChildren(field.children)
+      }
+    })
+
+    cm.msrCols.foreach(field => {
+      val encoders = new java.util.ArrayList[Encoding]()
+      val columnSchema: ColumnSchema = getColumnSchema(
+        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
+        index,
+        isCol = true,
+        encoders,
+        isDimensionCol = false,
+        -1,
+        field.precision,
+        field.scale)
+      val measureCol = columnSchema
+
+      allColumns ++= Seq(measureCol)
+      index = index + 1
+    })
+
+    // Check if there is any duplicate measures or dimensions.
+    // Its based on the dimension name and measure name
+    allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
+      val name = f._1
+      LOGGER.error(s"Duplicate column found with name: $name")
+      LOGGER.audit(
+        s"Validation failed for Create/Alter Table Operation " +
+            s"for ${ cm.databaseName }.${ cm.tableName }" +
+            s"Duplicate column found with name: $name")
+      sys.error(s"Duplicate dimensions found with name: $name")
+    })
+
+    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+
+    checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims)
+
+    updateColumnGroupsInFields(cm.columnGroups, allColumns)
+
+    var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
+    val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
+    val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
+    for (column <- allColumns) {
+      if (highCardinalityDims.contains(column.getColumnName)) {
+        newOrderedDims += column
+      } else if (column.isComplex) {
+        complexDims += column
+      } else if (column.isDimensionColumn) {
+        newOrderedDims += column
+      } else {
+        measures += column
+      }
+
+    }
+
+    // Setting the boolean value of useInvertedIndex in column schema
+    val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
+    for (column <- allColumns) {
+      // When the column is measure or the specified no inverted index column in DDL,
+      // set useInvertedIndex to false, otherwise true.
+      if (noInvertedIndexCols.contains(column.getColumnName) ||
+          cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
+        column.setUseInvertedIndex(false)
+      } else {
+        column.setUseInvertedIndex(true)
+      }
+    }
+
+    // Adding dummy measure if no measure is provided
+    if (measures.size < 1) {
+      val encoders = new java.util.ArrayList[Encoding]()
+      val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
+        CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
+        index,
+        true,
+        encoders,
+        false,
+        -1, 0, 0)
+      columnSchema.setInvisible(true)
+      val measureColumn = columnSchema
+      measures += measureColumn
+      allColumns = allColumns ++ measures
+    }
+    val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
+    columnValidator.validateColumns(allColumns)
+    newOrderedDims = newOrderedDims ++ complexDims ++ measures
+
+    val tableInfo = new TableInfo()
+    val tableSchema = new TableSchema()
+    val schemaEvol = new SchemaEvolution()
+    schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
+    tableSchema.setTableId(UUID.randomUUID().toString)
+    // populate table properties map
+    val tablePropertiesMap = new java.util.HashMap[String, String]()
+    cm.tableProperties.foreach {
+      x => tablePropertiesMap.put(x._1, x._2)
+    }
+    tableSchema.setTableProperties(tablePropertiesMap)
+    tableSchema.setTableName(cm.tableName)
+    tableSchema.setListOfColumns(allColumns.asJava)
+    tableSchema.setSchemaEvalution(schemaEvol)
+    tableInfo.setDatabaseName(cm.databaseName)
+    tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
+    tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    tableInfo.setFactTable(tableSchema)
+    tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
+    tableInfo
+  }
+
+  //  For checking if the specified col group columns are specified in fields list.
+  protected def checkColGroupsValidity(colGrps: Seq[String],
+      allCols: Seq[ColumnSchema],
+      highCardCols: Seq[String]): Unit = {
+    if (null != colGrps) {
+      colGrps.foreach(columngroup => {
+        val rowCols = columngroup.split(",")
+        rowCols.foreach(colForGrouping => {
+          var found: Boolean = false
+          // check for dimensions + measures
+          allCols.foreach(eachCol => {
+            if (eachCol.getColumnName.equalsIgnoreCase(colForGrouping.trim())) {
+              found = true
+            }
+          })
+          // check for No Dicitonary dimensions
+          highCardCols.foreach(noDicCol => {
+            if (colForGrouping.trim.equalsIgnoreCase(noDicCol)) {
+              found = true
+            }
+          })
+
+          if (!found) {
+            sys.error(s"column $colForGrouping is not present in Field list")
+          }
+        })
+      })
+    }
+  }
+
+  // For updating the col group details for fields.
+  private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: Seq[ColumnSchema]): Unit = {
+    if (null != colGrps) {
+      var colGroupId = -1
+      colGrps.foreach(columngroup => {
+        colGroupId += 1
+        val rowCols = columngroup.split(",")
+        rowCols.foreach(row => {
+
+          allCols.foreach(eachCol => {
+
+            if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) {
+              eachCol.setColumnGroup(colGroupId)
+              eachCol.setColumnar(false)
+            }
+          })
+        })
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
new file mode 100644
index 0000000..e08660c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+object DistributionUtil {
+  @transient
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /*
+   * This method will return the list of executers in the cluster.
+   * For this we take the  memory status of all node with getExecutorMemoryStatus
+   * and extract the keys. getExecutorMemoryStatus also returns the driver memory also
+   * In client mode driver will run in the localhost
+   * There can be executor spawn in same drive node. So we can remove first occurance of
+   * localhost for retriving executor list
+   */
+  def getNodeList(sparkContext: SparkContext): Array[String] = {
+    val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+      kv._1.split(":")(0)
+    }.toSeq
+    val localhostIPs = getLocalhostIPs
+    val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
+
+    val nodelist: List[String] = withoutDriverIP(arr.toList)(selectedLocalIPList.contains(_))
+    val masterMode = sparkContext.getConf.get("spark.master")
+    if (nodelist.nonEmpty) {
+      // Specific for Yarn Mode
+      if ("yarn-cluster".equals(masterMode) || "yarn-client".equals(masterMode)) {
+        val nodeNames = nodelist.map { x =>
+          val addr = InetAddress.getByName(x)
+          addr.getHostName
+        }
+        nodeNames.toArray
+      } else {
+        // For Standalone cluster, node IPs will be returned.
+        nodelist.toArray
+      }
+    } else {
+      Seq(InetAddress.getLocalHost.getHostName).toArray
+    }
+  }
+
+  private def getLocalhostIPs = {
+    val iface = NetworkInterface.getNetworkInterfaces
+    var addresses: List[InterfaceAddress] = List.empty
+    while (iface.hasMoreElements) {
+      addresses = iface.nextElement().getInterfaceAddresses.asScala.toList ++ addresses
+    }
+    val inets = addresses.map(_.getAddress.getHostAddress)
+    inets
+  }
+
+  /*
+   * This method will remove the first occurance of any of the ips  mentioned in the predicate.
+   * Eg: l = List(Master,slave1,Master,slave2,slave3) is the list of nodes where first Master is
+   * the Driver  node.
+   * this method withoutFirst (l)(x=> x == 'Master') will remove the first occurance of Master.
+   * The resulting List containt List(slave1,Master,slave2,slave3)
+   */
+  def withoutDriverIP[A](xs: List[A])(p: A => Boolean): List[A] = {
+    xs match {
+      case x :: rest => if (p(x)) {
+        rest
+      } else {
+        x :: withoutDriverIP(rest)(p)
+      }
+      case _ => Nil
+    }
+  }
+
+  /**
+   *
+   * Checking if the existing executors is greater than configured executors, if yes
+   * returning configured executors.
+   *
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+      sparkContext: SparkContext): Seq[String] = {
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+    ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
+  }
+
+  def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int,
+      sparkContext: SparkContext): Seq[String] = {
+    var confExecutorsTemp: String = null
+    if (sparkContext.getConf.contains("spark.executor.instances")) {
+      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
+    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
+               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+                 .equalsIgnoreCase("true")) {
+      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
+        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+      }
+    }
+
+    val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
+      confExecutors
+    } else {
+      nodesOfData
+    }
+
+    val startTime = System.currentTimeMillis()
+    ensureExecutors(sparkContext, requiredExecutors)
+    var nodes = DistributionUtil.getNodeList(sparkContext)
+    var maxTimes = 30
+    while (nodes.length < requiredExecutors && maxTimes > 0) {
+      Thread.sleep(500)
+      nodes = DistributionUtil.getNodeList(sparkContext)
+      maxTimes = maxTimes - 1
+    }
+    val timDiff = System.currentTimeMillis() - startTime
+    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
+    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
+    nodes.distinct
+  }
+
+  /**
+   * Requesting the extra executors other than the existing ones.
+   *
+   * @param sc
+   * @param numExecutors
+   * @return
+   */
+  def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        val requiredExecutors = numExecutors - b.numExistingExecutors
+        LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
+            s"${ b.numExistingExecutors }")
+        if (requiredExecutors > 0) {
+          b.requestExecutors(requiredExecutors)
+        }
+        true
+      case _ =>
+        false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
new file mode 100644
index 0000000..7909a13
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical._
+
+abstract class AbstractNode
+
+case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode
+
+case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
+  extends AbstractNode
+
+case class CarbonDictionaryTempDecoder(
+    attrList: util.Set[AttributeReferenceWrapper],
+    attrsNotDecode: util.Set[AttributeReferenceWrapper],
+    child: LogicalPlan,
+    isOuter: Boolean = false) extends UnaryNode {
+  var processed = false
+
+  def getAttrsNotDecode: util.Set[Attribute] = {
+    val set = new util.HashSet[Attribute]()
+    attrsNotDecode.asScala.foreach(f => set.add(f.attr))
+    set
+  }
+
+  def getAttrList: util.Set[Attribute] = {
+    val set = new util.HashSet[Attribute]()
+    attrList.asScala.foreach(f => set.add(f.attr))
+    set
+  }
+
+  override def output: Seq[Attribute] = child.output
+}
+
+class CarbonDecoderProcessor {
+
+  def getDecoderList(plan: LogicalPlan): util.List[AbstractNode] = {
+    val nodeList = new util.ArrayList[AbstractNode]
+    process(plan, nodeList)
+    nodeList
+  }
+
+  private def process(plan: LogicalPlan, nodeList: util.List[AbstractNode]): Unit = {
+    plan match {
+      case cd: CarbonDictionaryTempDecoder =>
+        nodeList.add(Node(cd))
+        process(cd.child, nodeList)
+      case j: BinaryNode =>
+        val leftList = new util.ArrayList[AbstractNode]
+        val rightList = new util.ArrayList[AbstractNode]
+        nodeList.add(BinaryCarbonNode(leftList, rightList))
+        process(j.left, leftList)
+        process(j.right, rightList)
+      case e: UnaryNode => process(e.child, nodeList)
+      case _ =>
+    }
+  }
+
+  def updateDecoders(nodeList: util.List[AbstractNode]): Unit = {
+    val scalaList = nodeList.asScala
+    val decoderNotDecode = new util.HashSet[AttributeReferenceWrapper]
+    updateDecoderInternal(scalaList, decoderNotDecode)
+  }
+
+  private def updateDecoderInternal(scalaList: mutable.Buffer[AbstractNode],
+      decoderNotDecode: util.HashSet[AttributeReferenceWrapper]): Unit = {
+    scalaList.reverseMap {
+      case Node(cd: CarbonDictionaryTempDecoder) =>
+        decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
+        decoderNotDecode.asScala.foreach(cd.attrList.remove)
+        decoderNotDecode.addAll(cd.attrList)
+      case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
+        val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
+        val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
+        updateDecoderInternal(left.asScala, leftNotDecode)
+        updateDecoderInternal(right.asScala, rightNotDecode)
+        decoderNotDecode.addAll(leftNotDecode)
+        decoderNotDecode.addAll(rightNotDecode)
+    }
+  }
+
+}
+
+case class AttributeReferenceWrapper(attr: Attribute) {
+
+  override def equals(other: Any): Boolean = other match {
+    case ar: AttributeReferenceWrapper =>
+      attr.name.equalsIgnoreCase(ar.attr.name) && attr.exprId == ar.attr.exprId
+    case _ => false
+  }
+
+  // constant hash value
+  lazy val hash = (attr.name.toLowerCase + "." + attr.exprId.id).hashCode
+  override def hashCode: Int = hash
+}
+
+case class Marker(set: util.Set[AttributeReferenceWrapper], binary: Boolean = false)
+
+class CarbonPlanMarker {
+  val markerStack = new util.Stack[Marker]
+  var joinCount = 0
+
+  def pushMarker(attrs: util.Set[AttributeReferenceWrapper]): Unit = {
+    markerStack.push(Marker(attrs))
+  }
+
+  def pushBinaryMarker(attrs: util.Set[AttributeReferenceWrapper]): Unit = {
+    markerStack.push(Marker(attrs, binary = true))
+    joinCount = joinCount + 1
+  }
+
+  def revokeJoin(): util.Set[AttributeReferenceWrapper] = {
+    if (joinCount > 0) {
+      while (!markerStack.empty()) {
+        val marker = markerStack.pop()
+        if (marker.binary) {
+          joinCount = joinCount - 1
+          return marker.set
+        }
+      }
+    }
+    markerStack.peek().set
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
new file mode 100644
index 0000000..387d1ef
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.processing.etl.DataLoadingException
+
+object FileUtils {
+  /**
+   * append all csv file path to a String, file path separated by comma
+   */
+  private def getPathsFromCarbonFile(carbonFile: CarbonFile, stringBuild: StringBuilder): Unit = {
+    if (carbonFile.isDirectory) {
+      val files = carbonFile.listFiles()
+      for (j <- 0 until files.size) {
+        getPathsFromCarbonFile(files(j), stringBuild)
+      }
+    } else {
+      val path = carbonFile.getAbsolutePath
+      val fileName = carbonFile.getName
+      if (carbonFile.getSize == 0) {
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+            .warn(s"skip empty input file: $path")
+      } else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
+                 fileName.startsWith(CarbonCommonConstants.POINT)) {
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+            .warn(s"skip invisible input file: $path")
+      } else {
+        stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
+      }
+    }
+  }
+
+  /**
+   * append all file path to a String, inputPath path separated by comma
+   *
+   */
+  def getPaths(inputPath: String): String = {
+    if (inputPath == null || inputPath.isEmpty) {
+      throw new DataLoadingException("Input file path cannot be empty.")
+    } else {
+      val stringBuild = new StringBuilder()
+      val filePaths = inputPath.split(",")
+      for (i <- 0 until filePaths.size) {
+        val fileType = FileFactory.getFileType(filePaths(i))
+        val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
+        if (!carbonFile.exists()) {
+          throw new DataLoadingException(s"The input file does not exist: ${filePaths(i)}" )
+        }
+        getPathsFromCarbonFile(carbonFile, stringBuild)
+      }
+      if (stringBuild.nonEmpty) {
+        stringBuild.substring(0, stringBuild.size - 1)
+      } else {
+        throw new DataLoadingException("Please check your input path and make sure " +
+                                       "that files end with '.csv' and content is not empty.")
+      }
+    }
+  }
+
+  def getSpaceOccupied(inputPath: String): Long = {
+    var size : Long = 0
+    if (inputPath == null || inputPath.isEmpty) {
+      size
+    } else {
+      val filePaths = inputPath.split(",")
+      for (i <- 0 until filePaths.size) {
+        val fileType = FileFactory.getFileType(filePaths(i))
+        val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
+        size = size + carbonFile.getSize
+      }
+      size
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
new file mode 100644
index 0000000..516ba58
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.tools.reflect.ToolBox
+
+/**
+ * It compiles the code dynamically at runtime and returns the object
+ */
+object ScalaCompilerUtil {
+
+  def compiledCode(code: String): Any = {
+    import scala.reflect.runtime.universe._
+    val cm = runtimeMirror(Utils.getContextOrSparkClassLoader)
+    val toolbox = cm.mkToolBox()
+
+    val tree = toolbox.parse(code)
+    toolbox.compile(tree)()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
new file mode 100644
index 0000000..e2f4c3e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
+
+import org.apache.carbondata.core.load.BlockDetails
+
+/*
+ * this object use to handle file splits
+ */
+object SparkUtil {
+
+  def setTaskContext(context: TaskContext): Unit = {
+    val localThreadContext = TaskContext.get()
+    if (localThreadContext == null) {
+      TaskContext.setTaskContext(context)
+    }
+  }
+
+  /**
+   * get file splits,return Array[BlockDetails], if file path is empty,then return empty Array
+   *
+   */
+  def getSplits(path: String, sc: SparkContext): Array[BlockDetails] = {
+    val filePath = FileUtils.getPaths(path)
+    if (filePath == null || filePath.isEmpty) {
+      // return a empty block details
+      Array[BlockDetails]()
+    } else {
+      // clone the hadoop configuration
+      val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
+      // set folder or file
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePath)
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+      val newHadoopRDD = new NewHadoopRDD[LongWritable, Text](
+        sc,
+        classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
+        classOf[LongWritable],
+        classOf[Text],
+        hadoopConfiguration)
+      val splits: Array[FileSplit] = newHadoopRDD.getPartitions.map { part =>
+        part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
+      }
+      splits.map { block =>
+        new BlockDetails(block.getPath,
+          block.getStart,
+          block.getLength,
+          block.getLocations
+        )
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
deleted file mode 100644
index 6a2c839..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.executor.QueryExecutor;
-import org.apache.carbondata.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.scan.model.QueryDimension;
-import org.apache.carbondata.scan.model.QueryMeasure;
-import org.apache.carbondata.scan.model.QueryModel;
-import org.apache.carbondata.scan.result.BatchResult;
-import org.apache.carbondata.scan.result.iterator.RawResultIterator;
-
-/**
- * Executor class for executing the query on the selected segments to be merged.
- * This will fire a select * query and get the raw result.
- */
-public class CarbonCompactionExecutor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-  private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
-  private final SegmentProperties destinationSegProperties;
-  private final String databaseName;
-  private final String factTableName;
-  private final Map<String, TaskBlockInfo> segmentMapping;
-  private final String storePath;
-  private QueryExecutor queryExecutor;
-  private CarbonTable carbonTable;
-  private QueryModel queryModel;
-
-  /**
-   * Constructor
-   *
-   * @param segmentMapping
-   * @param segmentProperties
-   * @param databaseName
-   * @param factTableName
-   * @param storePath
-   * @param carbonTable
-   */
-  public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
-      SegmentProperties segmentProperties, String databaseName, String factTableName,
-      String storePath, CarbonTable carbonTable,
-      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
-
-    this.segmentMapping = segmentMapping;
-
-    this.destinationSegProperties = segmentProperties;
-
-    this.databaseName = databaseName;
-
-    this.factTableName = factTableName;
-
-    this.storePath = storePath;
-
-    this.carbonTable = carbonTable;
-
-    this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
-  }
-
-  /**
-   * For processing of the table blocks.
-   *
-   * @return List of Carbon iterators
-   */
-  public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
-
-    List<RawResultIterator> resultList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
-    // iterate each seg ID
-    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
-      String segmentId = taskMap.getKey();
-      List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-
-      int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
-
-      SegmentProperties sourceSegProperties =
-          new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
-
-      // for each segment get taskblock info
-      TaskBlockInfo taskBlockInfo = taskMap.getValue();
-      Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
-
-      for (String task : taskBlockListMapping) {
-
-        list = taskBlockInfo.getTableBlockInfoList(task);
-        Collections.sort(list);
-        LOGGER.info("for task -" + task + "-block size is -" + list.size());
-        queryModel.setTableBlockInfos(list);
-        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
-            destinationSegProperties));
-
-      }
-    }
-
-    return resultList;
-  }
-
-  /**
-   * get executor and execute the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
-      throws QueryExecutionException {
-
-    queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
-    CarbonIterator<BatchResult> iter = null;
-    try {
-      iter = queryExecutor.execute(queryModel);
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e.getMessage());
-      throw e;
-    }
-
-    return iter;
-  }
-
-  /**
-   * Below method will be used
-   * for cleanup
-   */
-  public void finish() {
-    try {
-      queryExecutor.finish();
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e, "Problem while finish: ");
-    }
-    clearDictionaryFromQueryModel();
-  }
-
-  /**
-   * This method will clear the dictionary access count after its usage is complete so
-   * that column can be deleted form LRU cache whenever memory reaches threshold
-   */
-  private void clearDictionaryFromQueryModel() {
-    if (null != queryModel) {
-      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
-      if (null != columnToDictionaryMapping) {
-        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
-          CarbonUtil.clearDictionaryCache(entry.getValue());
-        }
-      }
-    }
-  }
-
-  /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-
-    QueryModel model = new QueryModel();
-
-    model.setTableBlockInfos(blockList);
-    model.setCountStarQuery(false);
-    model.setDetailQuery(true);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-
-    model.setQueryId(System.nanoTime() + "");
-
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-
-    model.setAggTable(false);
-    model.setLimit(-1);
-
-    model.setTable(carbonTable);
-
-    model.setInMemoryRecordSize(CarbonCommonConstants.COMPACTION_INMEMORY_RECORD_SIZE);
-
-    return model;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
deleted file mode 100644
index ed669a6..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.CarbonUtilException;
-
-import org.apache.spark.sql.hive.TableMeta;
-
-/**
- * Utility Class for the Compaction Flow.
- */
-public class CarbonCompactionUtil {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-
-  /**
-   * To create a mapping of Segment Id and TableBlockInfo.
-   *
-   * @param tableBlockInfoList
-   * @return
-   */
-  public static Map<String, TaskBlockInfo> createMappingForSegments(
-      List<TableBlockInfo> tableBlockInfoList) {
-
-    // stores taskBlockInfo of each segment
-    Map<String, TaskBlockInfo> segmentBlockInfoMapping =
-        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-
-    for (TableBlockInfo info : tableBlockInfoList) {
-      String segId = info.getSegmentId();
-      // check if segId is already present in map
-      TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
-      // extract task ID from file Path.
-      String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
-      // if taskBlockInfo is not there, then create and add
-      if (null == taskBlockInfoMapping) {
-        taskBlockInfoMapping = new TaskBlockInfo();
-        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
-        // put the taskBlockInfo with respective segment id
-        segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
-      } else
-      {
-        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
-      }
-    }
-    return segmentBlockInfoMapping;
-
-  }
-
-  /**
-   * Grouping the taskNumber and list of TableBlockInfo.
-   * @param info
-   * @param taskBlockMapping
-   * @param taskNo
-   */
-  private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
-      TaskBlockInfo taskBlockMapping, String taskNo) {
-    // get the corresponding list from task mapping.
-    List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
-    if (null != blockLists) {
-      blockLists.add(info);
-    } else {
-      blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      blockLists.add(info);
-      taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
-    }
-  }
-
-  /**
-   * To create a mapping of Segment Id and DataFileFooter.
-   *
-   * @param tableBlockInfoList
-   * @return
-   */
-  public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
-      List<TableBlockInfo> tableBlockInfoList) throws IndexBuilderException {
-
-    Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
-    for (TableBlockInfo blockInfo : tableBlockInfoList) {
-      List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
-      String segId = blockInfo.getSegmentId();
-
-      DataFileFooter dataFileMatadata = null;
-      // check if segId is already present in map
-      List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
-      try {
-        dataFileMatadata = CarbonUtil
-            .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
-                blockInfo.getBlockLength());
-      } catch (CarbonUtilException e) {
-        throw new IndexBuilderException(e);
-      }
-      if (null == metadataList) {
-        // if it is not present
-        eachSegmentBlocks.add(dataFileMatadata);
-        segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
-      } else {
-
-        // if its already present then update the list.
-        metadataList.add(dataFileMatadata);
-      }
-    }
-    return segmentBlockInfoMapping;
-
-  }
-
-  /**
-   * Check whether the file to indicate the compaction is present or not.
-   * @param metaFolderPath
-   * @return
-   */
-  public static boolean isCompactionRequiredForTable(String metaFolderPath) {
-    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.minorCompactionRequiredFile;
-
-    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.majorCompactionRequiredFile;
-    try {
-      if (FileFactory.isFileExist(minorCompactionStatusFile,
-          FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
-          .isFileExist(majorCompactionStatusFile,
-              FileFactory.getFileType(majorCompactionStatusFile))) {
-        return true;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage() );
-    }
-    return false;
-  }
-
-  /**
-   * Determine the type of the compaction received.
-   * @param metaFolderPath
-   * @return
-   */
-  public static CompactionType determineCompactionType(String metaFolderPath) {
-    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.minorCompactionRequiredFile;
-
-    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.majorCompactionRequiredFile;
-    try {
-      if (FileFactory.isFileExist(minorCompactionStatusFile,
-          FileFactory.getFileType(minorCompactionStatusFile))) {
-        return CompactionType.MINOR_COMPACTION;
-      }
-      if (FileFactory.isFileExist(majorCompactionStatusFile,
-          FileFactory.getFileType(majorCompactionStatusFile))) {
-        return CompactionType.MAJOR_COMPACTION;
-      }
-
-    } catch (IOException e) {
-      LOGGER.error("Exception in determining the compaction request file " + e.getMessage() );
-    }
-    return CompactionType.MINOR_COMPACTION;
-  }
-
-  /**
-   * Delete the compation request file once the compaction is done.
-   * @param metaFolderPath
-   * @param compactionType
-   * @return
-   */
-  public static boolean deleteCompactionRequiredFile(String metaFolderPath,
-      CompactionType compactionType) {
-    String compactionRequiredFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
-      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.minorCompactionRequiredFile;
-    } else {
-      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.majorCompactionRequiredFile;
-    }
-    try {
-      if (FileFactory
-          .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
-        if (FileFactory
-            .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
-            .delete()) {
-          LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
-          return true;
-        } else {
-          LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
-        }
-      } else {
-        LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
-    }
-    return false;
-  }
-
-  /**
-   * Creation of the compaction request if someother compaction is in progress.
-   * @param metaFolderPath
-   * @param compactionType
-   * @return
-   */
-  public static boolean createCompactionRequiredFile(String metaFolderPath,
-      CompactionType compactionType) {
-    String statusFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
-      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.minorCompactionRequiredFile;
-    } else {
-      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.majorCompactionRequiredFile;
-    }
-    try {
-      if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
-        if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
-          LOGGER.info("successfully created a compaction required file - " + statusFile);
-          return true;
-        } else {
-          LOGGER.error("Not able to create a compaction required file - " + statusFile);
-          return false;
-        }
-      } else {
-        LOGGER.info("Compaction request file : " + statusFile + " already exist.");
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in creating the compaction request file " + e.getMessage() );
-    }
-    return false;
-  }
-
-  /**
-   * This will check if any compaction request has been received for any table.
-   *
-   * @param tableMetas
-   * @return
-   */
-  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
-      List<CarbonTableIdentifier> skipList) {
-    for (TableMeta table : tableMetas) {
-      CarbonTable ctable = table.carbonTable();
-      String metadataPath = ctable.getMetaDataFilepath();
-      // check for the compaction required file and at the same time exclude the tables which are
-      // present in the skip list.
-      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
-          .contains(table.carbonTableIdentifier())) {
-        return table;
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
deleted file mode 100644
index 3aa0e9f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.spark.rdd.Compactor;
-
-import org.apache.spark.sql.execution.command.CompactionCallableModel;
-
-/**
- * Callable class which is used to trigger the compaction in a separate callable.
- */
-public class CompactionCallable implements Callable<Void> {
-
-  private final CompactionCallableModel compactionCallableModel;
-
-  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
-
-    this.compactionCallableModel = compactionCallableModel;
-  }
-
-  @Override public Void call() throws Exception {
-
-    Compactor.triggerCompaction(compactionCallableModel);
-    return null;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
deleted file mode 100644
index 40dd59d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-/**
- * This enum is used to define the types of Compaction.
- * We have 2 types. one is Minor and another is Major
- */
-public enum CompactionType {
-    MINOR_COMPACTION,
-    MAJOR_COMPACTION
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
deleted file mode 100644
index 1028f78..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.io.File;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This is the Merger class responsible for the merging of the segments.
- */
-public class RowResultMerger {
-
-  private final String databaseName;
-  private final String tableName;
-  private final String tempStoreLocation;
-  private final int measureCount;
-  private final String factStoreLocation;
-  private CarbonFactHandler dataHandler;
-  private List<RawResultIterator> rawResultIteratorList =
-      new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  private SegmentProperties segprop;
-  /**
-   * record holder heap
-   */
-  private AbstractQueue<RawResultIterator> recordHolderHeap;
-
-  private TupleConversionAdapter tupleConvertor;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RowResultMerger.class.getName());
-
-  public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
-      String tableName, SegmentProperties segProp, String tempStoreLocation,
-      CarbonLoadModel loadModel, int[] colCardinality) {
-
-    this.rawResultIteratorList = iteratorList;
-    // create the List of RawResultIterator.
-
-    recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
-        new RowResultMerger.CarbonMdkeyComparator());
-
-    this.segprop = segProp;
-    this.tempStoreLocation = tempStoreLocation;
-
-    this.factStoreLocation = loadModel.getStorePath();
-
-    if (!new File(tempStoreLocation).mkdirs()) {
-      LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
-    }
-
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-
-    this.measureCount = segprop.getMeasures().size();
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-            .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
-        getCarbonFactDataHandlerModel(loadModel);
-    carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
-            loadModel.getFactTimeStamp());
-    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    if (segProp.getNumberOfNoDictionaryDimension() > 0
-        || segProp.getComplexDimensions().size() > 0) {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
-    } else {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
-    }
-    carbonFactDataHandlerModel.setColCardinality(colCardinality);
-    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-
-    tupleConvertor = new TupleConversionAdapter(segProp);
-  }
-
-  /**
-   * Merge function
-   *
-   */
-  public boolean mergerSlice() {
-    boolean mergeStatus = false;
-    int index = 0;
-    try {
-
-      dataHandler.initialise();
-
-      // add all iterators to the queue
-      for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
-        this.recordHolderHeap.add(leaftTupleIterator);
-        index++;
-      }
-      RawResultIterator iterator = null;
-      while (index > 1) {
-        // iterator the top record
-        iterator = this.recordHolderHeap.poll();
-        Object[] convertedRow = iterator.next();
-        if(null == convertedRow){
-          throw new SliceMergerException("Unable to generate mdkey during compaction.");
-        }
-        // get the mdkey
-        addRow(convertedRow);
-        // if there is no record in the leaf and all then decrement the
-        // index
-        if (!iterator.hasNext()) {
-          index--;
-          continue;
-        }
-        // add record to heap
-        this.recordHolderHeap.add(iterator);
-      }
-      // if record holder is not empty then iterator the slice holder from
-      // heap
-      iterator = this.recordHolderHeap.poll();
-      while (true) {
-        Object[] convertedRow = iterator.next();
-        if(null == convertedRow){
-          throw new SliceMergerException("Unable to generate mdkey during compaction.");
-        }
-        addRow(convertedRow);
-        // check if leaf contains no record
-        if (!iterator.hasNext()) {
-          break;
-        }
-      }
-      this.dataHandler.finish();
-      mergeStatus = true;
-    } catch (Exception e) {
-      LOGGER.error("Exception in compaction merger " + e.getMessage());
-      mergeStatus = false;
-    } finally {
-      try {
-        this.dataHandler.closeHandler();
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
-        mergeStatus = false;
-      }
-    }
-
-    return mergeStatus;
-  }
-
-  /**
-   * Below method will be used to add sorted row
-   *
-   * @throws SliceMergerException
-   */
-  protected void addRow(Object[] carbonTuple) throws SliceMergerException {
-    Object[] rowInWritableFormat;
-
-    rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
-    try {
-      this.dataHandler.addDataToStore(rowInWritableFormat);
-    } catch (CarbonDataWriterException e) {
-      throw new SliceMergerException("Problem in merging the slice", e);
-    }
-  }
-
-  /**
-   * This method will create a model object for carbon fact data handler
-   *
-   * @param loadModel
-   * @return
-   */
-  private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
-    carbonFactDataHandlerModel.setDatabaseName(databaseName);
-    carbonFactDataHandlerModel.setTableName(tableName);
-    carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
-    carbonFactDataHandlerModel.setCompactionFlow(true);
-    carbonFactDataHandlerModel
-        .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
-    carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
-    carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
-    carbonFactDataHandlerModel.setSegmentProperties(segprop);
-    carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
-    carbonFactDataHandlerModel.setDimensionCount(
-        segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    List<ColumnSchema> wrapperColumnSchema = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
-            carbonTable.getMeasureByTableName(tableName));
-    carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
-    //TO-DO Need to handle complex types here .
-    Map<Integer, GenericDataType> complexIndexMap =
-        new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
-    carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
-    carbonFactDataHandlerModel.setDataWritingRequest(true);
-
-    char[] aggType = new char[segprop.getMeasures().size()];
-    Arrays.fill(aggType, 'n');
-    int i = 0;
-    for (CarbonMeasure msr : segprop.getMeasures()) {
-      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
-    }
-    carbonFactDataHandlerModel.setAggType(aggType);
-    carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
-
-    String carbonDataDirectoryPath =
-        checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
-            loadModel.getPartitionId(), loadModel.getSegmentId());
-    carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
-
-    List<CarbonDimension> dimensionByTableName =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
-    boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
-    int index = 0;
-    for (CarbonDimension dimension : dimensionByTableName) {
-      isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
-    }
-    carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
-    return carbonFactDataHandlerModel;
-  }
-
-  /**
-   * This method will get the store location for the given path, segment id and partition id
-   *
-   * @return data directory path
-   */
-  private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
-      String tableName, String partitionId, String segmentId) {
-    String carbonStorePath = factStoreLocation;
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
-    return carbonDataDirectoryPath;
-  }
-
-  /**
-   * Comparator class for comparing 2 raw row result.
-   */
-  private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
-
-    @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
-
-      Object[] row1 = new Object[0];
-      Object[] row2 = new Object[0];
-      try {
-        row1 = o1.fetchConverted();
-        row2 = o2.fetchConverted();
-      } catch (KeyGenException e) {
-        LOGGER.error(e.getMessage());
-      }
-      if (null == row1 || null == row2) {
-        return 0;
-      }
-      ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
-      ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
-      int compareResult = 0;
-      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
-      int dictionaryKeyOffset = 0;
-      byte[] dimCols1 = key1.getDictionaryKey();
-      byte[] dimCols2 = key2.getDictionaryKey();
-      int noDicIndex = 0;
-      for (int eachColumnValueSize : columnValueSizes) {
-        // case of dictionary cols
-        if (eachColumnValueSize > 0) {
-
-          compareResult = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
-                  dictionaryKeyOffset, eachColumnValueSize);
-          dictionaryKeyOffset += eachColumnValueSize;
-        } else { // case of no dictionary
-
-          byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
-          byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
-          compareResult =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
-          noDicIndex++;
-
-        }
-        if (0 != compareResult) {
-          return compareResult;
-        }
-      }
-      return 0;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
deleted file mode 100644
index c35b3ff..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
-import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This class will be used to convert the Result into the format used in data writer.
- */
-public class TupleConversionAdapter {
-
-  private final SegmentProperties segmentproperties;
-
-  private final List<CarbonMeasure> measureList;
-
-  private int noDictionaryPresentIndex;
-
-  private int measureCount;
-
-  private boolean isNoDictionaryPresent;
-
-  public TupleConversionAdapter(SegmentProperties segmentProperties) {
-    this.measureCount = segmentProperties.getMeasures().size();
-    this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
-    if (isNoDictionaryPresent) {
-      noDictionaryPresentIndex++;
-    }
-    this.segmentproperties = segmentProperties;
-    measureList = segmentProperties.getMeasures();
-  }
-
-  /**
-   * Converting the raw result to the format understandable by the data writer.
-   * @param carbonTuple
-   * @return
-   */
-  public Object[] getObjectArray(Object[] carbonTuple) {
-    Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
-    int index = 0;
-    // put measures.
-
-    for (int j = 1; j <= measureCount; j++) {
-      row[index++] = carbonTuple[j];
-    }
-
-    // put No dictionary byte []
-    if (isNoDictionaryPresent) {
-
-      int noDicCount = segmentproperties.getNumberOfNoDictionaryDimension();
-      List<byte[]> noDicByteArr = new ArrayList<>(noDicCount);
-      for (int i = 0; i < noDicCount; i++) {
-        noDicByteArr.add(((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeyByIndex(i));
-      }
-      byte[] singleByteArr = RemoveDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
-
-      row[index++] = singleByteArr;
-    }
-
-    // put No Dictionary Dims
-    row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
-    return row;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
deleted file mode 100644
index de7d4a2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-
-import java.util.Locale;
-
-public class MalformedCarbonCommandException extends Exception {
-
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public MalformedCarbonCommandException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public MalformedCarbonCommandException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override
-  public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}