You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:42 UTC

[04/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
deleted file mode 100644
index c91cec0..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.io.{DataInputStream, InputStreamReader}
-import java.nio.charset.Charset
-import java.text.SimpleDateFormat
-import java.util.regex.Pattern
-
-import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.control.Breaks.{break, breakable}
-
-import au.com.bytecode.opencsv.CSVReader
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
-
-/**
- * A partitioner partition by column.
- *
- * @constructor create a partitioner
- * @param numParts the number of partitions
- */
-class ColumnPartitioner(numParts: Int) extends Partitioner {
-  override def numPartitions: Int = numParts
-
-  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-}
-
-trait GenericParser {
-  val dimension: CarbonDimension
-
-  def addChild(child: GenericParser): Unit
-
-  def parseString(input: String): Unit
-}
-
-case class DictionaryStats(distinctValues: java.util.List[String],
-    dictWriteTime: Long, sortIndexWriteTime: Long)
-
-case class PrimitiveParser(dimension: CarbonDimension,
-    setOpt: Option[HashSet[String]]) extends GenericParser {
-  val (hasDictEncoding, set: HashSet[String]) = setOpt match {
-    case None => (false, new HashSet[String])
-    case Some(x) => (true, x)
-  }
-
-  def addChild(child: GenericParser): Unit = {
-  }
-
-  def parseString(input: String): Unit = {
-    if (hasDictEncoding && input != null) {
-      set.add(input)
-    }
-  }
-}
-
-case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser {
-  var children: GenericParser = _
-
-  def addChild(child: GenericParser): Unit = {
-    children = child
-  }
-
-  def parseString(input: String): Unit = {
-    if (StringUtils.isNotEmpty(input)) {
-      val splits = format.getSplits(input)
-      if (ArrayUtils.isNotEmpty(splits)) {
-        splits.foreach { s =>
-          children.parseString(s)
-        }
-      }
-    }
-  }
-}
-
-case class StructParser(dimension: CarbonDimension,
-    format: DataFormat) extends GenericParser {
-  val children = new ArrayBuffer[GenericParser]
-
-  def addChild(child: GenericParser): Unit = {
-    children += child
-  }
-
-  def parseString(input: String): Unit = {
-    if (StringUtils.isNotEmpty(input)) {
-      val splits = format.getSplits(input)
-      val len = Math.min(children.length, splits.length)
-      for (i <- 0 until len) {
-        children(i).parseString(splits(i))
-      }
-    }
-  }
-}
-
-case class DataFormat(delimiters: Array[String],
-    var delimiterIndex: Int,
-    patterns: Array[Pattern]) extends Serializable {
-  self =>
-  def getSplits(input: String): Array[String] = {
-    // -1 in case after splitting the last column is empty, the surrogate key ahs to be generated
-    // for empty value too
-    patterns(delimiterIndex).split(input, -1)
-  }
-
-  def cloneAndIncreaseIndex: DataFormat = {
-    DataFormat(delimiters, Math.min(delimiterIndex + 1, delimiters.length - 1), patterns)
-  }
-}
-
-/**
- * a case class to package some attributes
- */
-case class DictionaryLoadModel(table: CarbonTableIdentifier,
-    dimensions: Array[CarbonDimension],
-    hdfsLocation: String,
-    dictfolderPath: String,
-    dictFilePaths: Array[String],
-    dictFileExists: Array[Boolean],
-    isComplexes: Array[Boolean],
-    primDimensions: Array[CarbonDimension],
-    delimiters: Array[String],
-    highCardIdentifyEnable: Boolean,
-    highCardThreshold: Int,
-    rowCountPercentage: Double,
-    columnIdentifier: Array[ColumnIdentifier],
-    isFirstLoad: Boolean,
-    hdfsTempLocation: String,
-    lockType: String,
-    zooKeeperUrl: String,
-    serializationNullFormat: String) extends Serializable
-
-case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
-
-/**
- * A RDD to combine all dictionary distinct values.
- *
- * @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev  the input RDD[(String, Iterable[String])]
- * @param model a model package load info
- */
-class CarbonAllDictionaryCombineRDD(
-    prev: RDD[(String, Iterable[String])],
-    model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  override def getPartitions: Array[Partition] = {
-    firstParent[(String, Iterable[String])].partitions
-  }
-
-  override def compute(split: Partition, context: TaskContext
-  ): Iterator[(Int, ColumnDistinctValues)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
-    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
-    /*
-     * for all dictionary, all columns need to encoding and checking
-     * isHighCardinalityColumn, so no need to calculate rowcount
-     */
-    val rowCount = 0L
-    try {
-      val dimensionParsers =
-        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
-      val dimNum = model.dimensions.length
-      // Map[dimColName -> dimColNameIndex]
-      val columnIndexMap = new HashMap[String, Int]()
-      for (j <- 0 until dimNum) {
-        columnIndexMap.put(model.dimensions(j).getColName, j)
-      }
-
-      var row: (String, Iterable[String]) = null
-      val rddIter = firstParent[(String, Iterable[String])].iterator(split, context)
-      // generate block distinct value set
-      while (rddIter.hasNext) {
-        row = rddIter.next()
-        if (row != null) {
-          columnIndexMap.get(row._1) match {
-            case Some(index) =>
-              for (record <- row._2) {
-                dimensionParsers(index).parseString(record)
-              }
-            case None =>
-          }
-        }
-      }
-    } catch {
-      case ex: Exception =>
-        LOGGER.error(ex)
-        throw ex
-    }
-
-    distinctValuesList.map { iter =>
-      val valueList = iter._2.toArray
-      (iter._1, ColumnDistinctValues(valueList, rowCount))
-    }.iterator
-  }
-}
-
-/**
- * A RDD to combine distinct values in block.
- *
- * @constructor create a RDD with RDD[Row]
- * @param prev  the input RDD[Row]
- * @param model a model package load info
- */
-class CarbonBlockDistinctValuesCombineRDD(
-    prev: RDD[Row],
-    model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
-  override def compute(split: Partition,
-      context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
-    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
-    var rowCount = 0L
-    try {
-      val dimensionParsers =
-        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
-      val dimNum = model.dimensions.length
-      var row: Row = null
-      val rddIter = firstParent[Row].iterator(split, context)
-      val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-          .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-      val format = new SimpleDateFormat(formatString)
-      // generate block distinct value set
-      while (rddIter.hasNext) {
-        row = rddIter.next()
-        if (row != null) {
-          rowCount += 1
-          for (i <- 0 until dimNum) {
-            dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
-                model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
-          }
-        }
-      }
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
-    } catch {
-      case ex: Exception =>
-        LOGGER.error(ex)
-        throw ex
-    }
-
-    distinctValuesList.map { iter =>
-      val valueList = iter._2.toArray
-      (iter._1, ColumnDistinctValues(valueList, rowCount))
-    }.iterator
-  }
-}
-
-/**
- * A RDD to generate dictionary file for each column
- *
- * @constructor create a RDD with RDD[Row]
- * @param prev  the input RDD[Row]
- * @param model a model package load info
- */
-class CarbonGlobalDictionaryGenerateRDD(
-    prev: RDD[(Int, ColumnDistinctValues)],
-    model: DictionaryLoadModel)
-  extends RDD[(Int, String, Boolean)](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-    var isHighCardinalityColumn = false
-    val iter = new Iterator[(Int, String, Boolean)] {
-      var dictionaryForDistinctValueLookUp:
-      org.apache.carbondata.core.cache.dictionary.Dictionary = _
-      var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _
-      var dictionaryForDistinctValueLookUpCleared: Boolean = false
-      val pathService = CarbonCommonFactory.getPathService
-      val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table)
-      if (StringUtils.isNotBlank(model.hdfsTempLocation )) {
-         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
-           model.hdfsTempLocation)
-      }
-      if (StringUtils.isNotBlank(model.lockType)) {
-        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
-          model.lockType)
-      }
-      if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
-        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
-          model.zooKeeperUrl)
-      }
-      val dictLock = CarbonLockFactory
-        .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
-          model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
-      var isDictionaryLocked = false
-      // generate distinct value list
-      try {
-        val t1 = System.currentTimeMillis
-        val valuesBuffer = new mutable.HashSet[String]
-        val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
-        var rowCount = 0L
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
-        breakable {
-          while (rddIter.hasNext) {
-            val distinctValueList = rddIter.next()._2
-            valuesBuffer ++= distinctValueList.values
-            rowCount += distinctValueList.rowCount
-            // check high cardinality
-            if (model.isFirstLoad && model.highCardIdentifyEnable
-                && !model.isComplexes(split.index)
-                && model.dimensions(split.index).isColumnar) {
-              isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
-                valuesBuffer.size, rowCount, model)
-              if (isHighCardinalityColumn) {
-                break
-              }
-            }
-          }
-        }
-        val combineListTime = System.currentTimeMillis() - t1
-        if (isHighCardinalityColumn) {
-          LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
-                      s"${
-                        model.primDimensions(split.index)
-                          .getColName
-                      } is high cardinality column")
-        } else {
-          isDictionaryLocked = dictLock.lockWithRetries()
-          if (isDictionaryLocked) {
-            logInfo(s"Successfully able to get the dictionary lock for ${
-              model.primDimensions(split.index).getColName
-            }")
-          } else {
-            sys
-              .error(s"Dictionary file ${
-                model.primDimensions(split.index).getColName
-              } is locked for updation. Please try after some time")
-          }
-          val t2 = System.currentTimeMillis
-          val fileType = FileFactory.getFileType(model.dictFilePaths(split.index))
-          model.dictFileExists(split.index) = FileFactory
-            .isFileExist(model.dictFilePaths(split.index), fileType)
-          dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
-            CarbonLoaderUtil.getDictionary(model.table,
-              model.columnIdentifier(split.index),
-              model.hdfsLocation,
-              model.primDimensions(split.index).getDataType
-            )
-          } else {
-            null
-          }
-          val dictCacheTime = System.currentTimeMillis - t2
-          val t3 = System.currentTimeMillis()
-          val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
-            dictionaryForDistinctValueLookUp,
-            model,
-            split.index)
-          // execute dictionary writer task to get distinct values
-          val distinctValues = dictWriteTask.execute()
-          val dictWriteTime = System.currentTimeMillis() - t3
-          val t4 = System.currentTimeMillis()
-          // if new data came than rewrite sort index file
-          if (distinctValues.size() > 0) {
-            val sortIndexWriteTask = new SortIndexWriterTask(model,
-              split.index,
-              dictionaryForDistinctValueLookUp,
-              distinctValues)
-            sortIndexWriteTask.execute()
-          }
-          val sortIndexWriteTime = System.currentTimeMillis() - t4
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
-          // After sortIndex writing, update dictionaryMeta
-          dictWriteTask.updateMetaData()
-          // clear the value buffer after writing dictionary data
-          valuesBuffer.clear
-          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
-          dictionaryForDistinctValueLookUpCleared = true
-          LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
-                      s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
-                      s"\n new distinct values count: ${ distinctValues.size() }" +
-                      s"\n combine lists: $combineListTime" +
-                      s"\n create dictionary cache: $dictCacheTime" +
-                      s"\n sort list, distinct and write: $dictWriteTime" +
-                      s"\n write sort info: $sortIndexWriteTime")
-        }
-      } catch {
-        case ex: Exception =>
-          LOGGER.error(ex)
-          throw ex
-      } finally {
-        if (!dictionaryForDistinctValueLookUpCleared) {
-          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
-        }
-        CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
-        if (dictLock != null && isDictionaryLocked) {
-          if (dictLock.unlock()) {
-            logInfo(s"Dictionary ${
-              model.primDimensions(split.index).getColName
-            } Unlocked Successfully.")
-          } else {
-            logError(s"Unable to unlock Dictionary ${
-              model.primDimensions(split.index).getColName
-            }")
-          }
-        }
-      }
-      var finished = false
-
-      override def hasNext: Boolean = {
-
-        if (!finished) {
-          finished = true
-          finished
-        } else {
-          !finished
-        }
-      }
-
-      override def next(): (Int, String, Boolean) = {
-        (split.index, status, isHighCardinalityColumn)
-      }
-    }
-
-    iter
-  }
-
-}
-
-/**
- * Set column dictionry patition format
- *
- * @param id        partition id
- * @param dimension current carbon dimension
- */
-class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
-  extends Partition {
-  override val index: Int = id
-  val preDefDictDimension = dimension
-}
-
-
-/**
- * Use external column dict to generate global dictionary
- *
- * @param carbonLoadModel carbon load model
- * @param sparkContext    spark context
- * @param table           carbon table identifier
- * @param dimensions      carbon dimenisons having predefined dict
- * @param hdfsLocation    carbon base store path
- * @param dictFolderPath  path of dictionary folder
- */
-class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
-    dictionaryLoadModel: DictionaryLoadModel,
-    sparkContext: SparkContext,
-    table: CarbonTableIdentifier,
-    dimensions: Array[CarbonDimension],
-    hdfsLocation: String,
-    dictFolderPath: String)
-  extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
-
-  override def getPartitions: Array[Partition] = {
-    val primDimensions = dictionaryLoadModel.primDimensions
-    val primDimLength = primDimensions.length
-    val result = new Array[Partition](primDimLength)
-    for (i <- 0 until primDimLength) {
-      result(i) = new CarbonColumnDictPatition(i, primDimensions(i))
-    }
-    result
-  }
-
-  override def compute(split: Partition, context: TaskContext)
-  : Iterator[(Int, ColumnDistinctValues)] = {
-    val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
-    val primDimension = theSplit.preDefDictDimension
-    // read the column dict data
-    val preDefDictFilePath = carbonLoadModel.getPredefDictFilePath(primDimension)
-    var csvReader: CSVReader = null
-    var inputStream: DataInputStream = null
-    var colDictData: java.util.Iterator[Array[String]] = null
-    try {
-      inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
-        FileFactory.getFileType(preDefDictFilePath))
-      csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset),
-        carbonLoadModel.getCsvDelimiter.charAt(0))
-      // read the column data to list iterator
-      colDictData = csvReader.readAll.iterator
-    } catch {
-      case ex: Exception =>
-        logError(s"Error in reading pre-defined " +
-                 s"dictionary file:${ ex.getMessage }")
-        throw ex
-    } finally {
-      if (csvReader != null) {
-        try {
-          csvReader.close()
-        } catch {
-          case ex: Exception =>
-            logError(s"Error in closing csvReader of " +
-                     s"pre-defined dictionary file:${ ex.getMessage }")
-        }
-      }
-      if (inputStream != null) {
-        try {
-          inputStream.close()
-        } catch {
-          case ex: Exception =>
-            logError(s"Error in closing inputStream of " +
-                     s"pre-defined dictionary file:${ ex.getMessage }")
-        }
-      }
-    }
-    val mapIdWithSet = new HashMap[String, HashSet[String]]
-    val columnValues = new HashSet[String]
-    val distinctValues = (theSplit.index, columnValues)
-    mapIdWithSet.put(primDimension.getColumnId, columnValues)
-    // use parser to generate new dict value
-    val dimensionParser = GlobalDictionaryUtil.generateParserForDimension(
-      Some(primDimension),
-      createDataFormat(carbonLoadModel.getDelimiters),
-      mapIdWithSet).get
-    // parse the column data
-    while (colDictData.hasNext) {
-      dimensionParser.parseString(colDictData.next()(0))
-    }
-    Array((distinctValues._1,
-      ColumnDistinctValues(distinctValues._2.toArray, 0L))).iterator
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
deleted file mode 100644
index 249f2cd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.util
-import java.util.{Collections, List}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
-import org.apache.spark.sql.hive.DistributionUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.splits.TableSplit
-
-
-class CarbonMergerRDD[K, V](
-    sc: SparkContext,
-    result: MergeResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping,
-    confExecutorsTemp: String)
-  extends RDD[(K, V)](sc, Nil) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-  sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-
-  var storeLocation: String = null
-  val storePath = carbonMergerMapping.storePath
-  val metadataFilePath = carbonMergerMapping.metadataFilePath
-  val mergedLoadName = carbonMergerMapping.mergedLoadName
-  val databaseName = carbonMergerMapping.databaseName
-  val factTableName = carbonMergerMapping.factTableName
-  val tableId = carbonMergerMapping.tableId
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[(K, V)] {
-
-      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-      val tempLocationKey: String = CarbonCommonConstants
-                                      .COMPACTION_KEY_WORD + '_' + carbonLoadModel
-                                      .getDatabaseName + '_' + carbonLoadModel
-                                      .getTableName + '_' + carbonLoadModel.getTaskNo
-
-      // this property is used to determine whether temp location for carbon is inside
-      // container temp dir or is yarn application directory.
-      val carbonUseLocalDir = CarbonProperties.getInstance()
-        .getProperty("carbon.use.local.dir", "false")
-
-      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.nonEmpty) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-      } else {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
-      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-      var mergeStatus = false
-      var mergeNumber = ""
-      var exec: CarbonCompactionExecutor = null
-      try {
-        val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
-
-        // get destination segment properties as sent from driver which is of last segment.
-
-        val segmentProperties = new SegmentProperties(
-          carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
-          carbonMergerMapping.maxSegmentColCardinality)
-
-        // sorting the table block info List.
-        val splitList = carbonSparkPartition.split.value.getAllSplits
-        val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
-
-        Collections.sort(tableBlockInfoList)
-
-        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
-          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
-
-        val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
-          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
-
-        carbonLoadModel.setStorePath(storePath)
-
-        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
-          factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-          dataFileMetadataSegMapping
-        )
-
-        // fire a query and get the results.
-        var result2: util.List[RawResultIterator] = null
-        try {
-          result2 = exec.processTableBlocks()
-        } catch {
-          case e: Throwable =>
-            if (null != exec) {
-              exec.finish()
-            }
-            LOGGER.error(e)
-            if (null != e.getMessage) {
-              sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
-            } else {
-              sys.error("Exception occurred in query execution.Please check logs.")
-            }
-        }
-        mergeNumber = mergedLoadName
-          .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-                     CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
-          )
-
-        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-          factTableName,
-          carbonLoadModel.getTaskNo,
-          "0",
-          mergeNumber,
-          true
-        )
-
-        carbonLoadModel.setSegmentId(mergeNumber)
-        carbonLoadModel.setPartitionId("0")
-        val merger =
-          new RowResultMerger(result2,
-            databaseName,
-            factTableName,
-            segmentProperties,
-            tempStoreLoc,
-            carbonLoadModel,
-            carbonMergerMapping.maxSegmentColCardinality
-          )
-        mergeStatus = merger.mergerSlice()
-
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e)
-          throw e
-      } finally {
-        // delete temp location data
-        val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
-        try {
-          val isCompactionFlow = true
-          CarbonLoaderUtil
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
-        } catch {
-          case e: Exception =>
-            LOGGER.error(e)
-        }
-        if (null != exec) {
-          exec.finish
-        }
-      }
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished) {
-          finished = true
-          finished
-        } else {
-          !finished
-        }
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(0, mergeStatus)
-      }
-
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonSparkPartition]
-    theSplit.split.value.getLocations.filter(_ != "localhost")
-  }
-
-  override def getPartitions: Array[Partition] = {
-    val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
-    )
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = new Job(jobConf)
-    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    var defaultParallelism = sparkContext.defaultParallelism
-    val result = new util.ArrayList[Partition](defaultParallelism)
-
-    // mapping of the node and block list.
-    var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
-        util.HashMap[String, util.List[Distributable]]
-
-    val noOfBlocks = 0
-    var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
-
-    // for each valid segment.
-    for (eachSeg <- carbonMergerMapping.validSegments) {
-
-      // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
-
-      // get splits
-      val splits = format.getSplits(job)
-      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-    }
-
-    // prepare the details required to extract the segment properties using last segment.
-    if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
-      val carbonInputSplit = carbonInputSplits.last
-      var dataFileFooter: DataFileFooter = null
-
-      try {
-        dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
-          carbonInputSplit.getStart, carbonInputSplit.getLength)
-      } catch {
-        case e: CarbonUtilException =>
-          logError("Exception in preparing the data file footer for compaction " + e.getMessage)
-          throw e
-      }
-
-      carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
-        .getColumnCardinality
-      carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
-        .toList
-    }
-    // send complete list of blocks to the mapping util.
-    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
-      carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
-
-    val confExecutors = confExecutorsTemp.toInt
-    val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
-      confExecutors
-    } else { nodeBlockMapping.size() }
-    CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
-    logInfo("No.of Executors required=" + requiredExecutors +
-            " , spark.executor.instances=" + confExecutors +
-            ", no.of.nodes where data present=" + nodeBlockMapping.size())
-    var nodes = DistributionUtil.getNodeList(sparkContext)
-    var maxTimes = 30
-    while (nodes.length < requiredExecutors && maxTimes > 0) {
-      Thread.sleep(500)
-      nodes = DistributionUtil.getNodeList(sparkContext)
-      maxTimes = maxTimes - 1
-    }
-    logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
-    defaultParallelism = sparkContext.defaultParallelism
-    var i = 0
-
-    val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
-
-    // Create Spark Partition for each task and assign blocks
-    nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
-      val taskBlockList = new util.ArrayList[NodeInfo](0)
-      nodeTaskBlocksMap.put(nodeName, taskBlockList)
-      var blockletCount = 0
-      blockList.asScala.foreach { taskInfo =>
-        val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
-        blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
-        taskBlockList.add(
-          NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
-      }
-      if (blockletCount != 0) {
-        val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
-          carbonInputSplits.asJava, nodeName)
-        result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
-        i += 1
-      }
-    }
-
-    // print the node info along with task and number of blocks for the task.
-
-    nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
-      logInfo(s"for the node ${ entry._1 }")
-      for (elem <- entry._2.asScala) {
-        logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
-      }
-    })
-
-    val noOfNodes = nodes.length
-    val noOfTasks = result.size
-    logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
-            s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
-    logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
-    for (j <- 0 until result.size ) {
-      val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
-      val splitList = multiBlockSplit.getAllSplits
-      logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
-              s"${CarbonInputSplit.createBlocks(splitList).size}")
-    }
-    result.toArray(new Array[Partition](result.size))
-  }
-
-}
-
-class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e8d7399..cae99d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -47,7 +47,7 @@ class CarbonSparkPartition(
     val rddId: Int,
     val idx: Int,
     @transient val multiBlockSplit: CarbonMultiBlockSplit)
-  extends Partition {
+    extends Partition {
 
   val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
 
@@ -67,9 +67,9 @@ class CarbonScanRDD[V: ClassTag](
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient carbonTable: CarbonTable)
-  extends RDD[V](sc, Nil)
-    with CarbonHadoopMapReduceUtil
-    with Logging {
+    extends RDD[V](sc, Nil)
+        with CarbonHadoopMapReduceUtil
+        with Logging {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
deleted file mode 100644
index 6c6076e..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
-
-/**
- * Compactor class which handled the compaction cases.
- */
-object Compactor {
-
-  val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
-
-  def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
-
-    val storePath = compactionCallableModel.storePath
-    val storeLocation = compactionCallableModel.storeLocation
-    val carbonTable = compactionCallableModel.carbonTable
-    val kettleHomePath = compactionCallableModel.kettleHomePath
-    val cubeCreationTime = compactionCallableModel.cubeCreationTime
-    val loadsToMerge = compactionCallableModel.loadsToMerge
-    val sc = compactionCallableModel.sqlContext
-    val carbonLoadModel = compactionCallableModel.carbonLoadModel
-    val compactionType = compactionCallableModel.compactionType
-
-    val startTime = System.nanoTime()
-    val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
-    var finalMergeStatus = false
-    val schemaName: String = carbonLoadModel.getDatabaseName
-    val factTableName = carbonLoadModel.getTableName
-    val validSegments: Array[String] = CarbonDataMergerUtil
-      .getValidSegments(loadsToMerge).split(',')
-    val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
-    val carbonMergerMapping = CarbonMergerMapping(storeLocation,
-      storePath,
-      carbonTable.getMetaDataFilepath,
-      mergedLoadName,
-      kettleHomePath,
-      cubeCreationTime,
-      schemaName,
-      factTableName,
-      validSegments,
-      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-      maxSegmentColCardinality = null,
-      maxSegmentColumnSchemaList = null
-    )
-    carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
-    carbonLoadModel.setLoadMetadataDetails(
-      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
-    var execInstance = "1"
-    // in case of non dynamic executor allocation, number of executors are fixed.
-    if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
-      execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
-      logger.info(s"spark.executor.instances property is set to = $execInstance")
-    } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
-    else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
-      if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-        .equalsIgnoreCase("true")) {
-        execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
-        logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
-      }
-    }
-
-    val mergeStatus = new CarbonMergerRDD(
-      sc.sparkContext,
-      new MergeResultImpl(),
-      carbonLoadModel,
-      carbonMergerMapping,
-      execInstance
-    ).collect
-
-    if (mergeStatus.length == 0) {
-      finalMergeStatus = false
-    } else {
-      finalMergeStatus = mergeStatus.forall(_._2)
-    }
-
-    if (finalMergeStatus) {
-      val endTime = System.nanoTime()
-      logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
-      if (!CarbonDataMergerUtil
-        .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
-          mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
-        )) {
-        logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
-                     s"${ carbonLoadModel.getTableName }")
-        logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
-                     s"${ carbonLoadModel.getTableName }")
-        throw new Exception(s"Compaction failed to update metadata for table" +
-                            s" ${ carbonLoadModel.getDatabaseName }." +
-                            s"${ carbonLoadModel.getTableName }")
-      } else {
-        logger.audit(s"Compaction request completed for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
-                    s".${ carbonLoadModel.getTableName }")
-      }
-    } else {
-      logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
-                   s".${ carbonLoadModel.getTableName }"
-      )
-      logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
-                   s".${ carbonLoadModel.getTableName }")
-      throw new Exception("Compaction Failure in Merger Rdd.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3a393ed..67d1ce0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -51,7 +51,6 @@ class NewCarbonDataLoadRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
     loadCount: Integer,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
     isTableSplitPartition: Boolean)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
deleted file mode 100644
index e23b58d..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.tasks
-
-import java.io.IOException
-
-import scala.collection.mutable
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.spark.rdd.DictionaryLoadModel
-
-/**
- *
- * @param valuesBuffer
- * @param dictionary
- * @param model
- * @param columnIndex
- * @param writer
- */
-class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
-    dictionary: Dictionary,
-    model: DictionaryLoadModel, columnIndex: Int,
-    var writer: CarbonDictionaryWriter = null) {
-
-  /**
-   * execute the task
-   *
-   * @return distinctValueList and time taken to write
-   */
-  def execute(): java.util.List[String] = {
-    val values = valuesBuffer.toArray
-    java.util.Arrays.sort(values, Ordering[String])
-    val dictService = CarbonCommonFactory.getDictionaryService
-    writer = dictService.getDictionaryWriter(
-      model.table,
-      model.columnIdentifier(columnIndex),
-      model.hdfsLocation)
-    val distinctValues: java.util.List[String] = new java.util.ArrayList()
-
-    try {
-      if (!model.dictFileExists(columnIndex)) {
-        writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
-        distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
-      }
-
-      if (values.length >= 1) {
-        if (model.dictFileExists(columnIndex)) {
-          for (value <- values) {
-            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
-                model.primDimensions(columnIndex))
-            if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
-              CarbonCommonConstants.INVALID_SURROGATE_KEY) {
-              writer.write(parsedValue)
-              distinctValues.add(parsedValue)
-            }
-          }
-
-        } else {
-          for (value <- values) {
-            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
-                model.primDimensions(columnIndex))
-            if (null != parsedValue) {
-              writer.write(parsedValue)
-              distinctValues.add(parsedValue)
-            }
-          }
-        }
-      }
-    } catch {
-      case ex: IOException =>
-        throw ex
-    } finally {
-      if (null != writer) {
-        writer.close()
-      }
-    }
-    distinctValues
-  }
-
-  /**
-   * update dictionary metadata
-   */
-  def updateMetaData() {
-    if (null != writer) {
-      writer.commit()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
deleted file mode 100644
index d552331..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.tasks
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
-import org.apache.carbondata.spark.rdd.DictionaryLoadModel
-
-/**
- * This task writes sort index file
- *
- * @param model
- * @param index
- * @param dictionary
- * @param distinctValues
- * @param carbonDictionarySortIndexWriter
- */
-class SortIndexWriterTask(model: DictionaryLoadModel,
-    index: Int,
-    dictionary: Dictionary,
-    distinctValues: java.util.List[String],
-    var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) {
-  def execute() {
-    try {
-      if (distinctValues.size() > 0) {
-        val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
-        val dictService = CarbonCommonFactory.getDictionaryService
-        val dictionarySortInfo: CarbonDictionarySortInfo =
-          preparator.getDictionarySortInfo(distinctValues, dictionary,
-            model.primDimensions(index).getDataType)
-        carbonDictionarySortIndexWriter =
-          dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
-            model.hdfsLocation)
-        carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
-        carbonDictionarySortIndexWriter
-          .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
-      }
-    } finally {
-      if (null != carbonDictionarySortIndexWriter) {
-        carbonDictionarySortIndexWriter.close()
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
deleted file mode 100644
index d91a012..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.io.File
-import java.text.SimpleDateFormat
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonDataType}
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-
-object CarbonScalaUtil {
-  def convertSparkToCarbonDataType(
-      dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
-    dataType match {
-      case StringType => CarbonDataType.STRING
-      case ShortType => CarbonDataType.SHORT
-      case IntegerType => CarbonDataType.INT
-      case LongType => CarbonDataType.LONG
-      case DoubleType => CarbonDataType.DOUBLE
-      case FloatType => CarbonDataType.FLOAT
-      case DateType => CarbonDataType.DATE
-      case BooleanType => CarbonDataType.BOOLEAN
-      case TimestampType => CarbonDataType.TIMESTAMP
-      case ArrayType(_, _) => CarbonDataType.ARRAY
-      case StructType(_) => CarbonDataType.STRUCT
-      case NullType => CarbonDataType.NULL
-      case _ => CarbonDataType.DECIMAL
-    }
-  }
-
-  def convertSparkToCarbonSchemaDataType(dataType: String): String = {
-    dataType match {
-      case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER
-      case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER
-      case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT
-      case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP
-      case anyType => anyType
-    }
-  }
-
-  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
-    dataType match {
-      case CarbonDataType.STRING => StringType
-      case CarbonDataType.SHORT => ShortType
-      case CarbonDataType.INT => IntegerType
-      case CarbonDataType.LONG => LongType
-      case CarbonDataType.DOUBLE => DoubleType
-      case CarbonDataType.BOOLEAN => BooleanType
-      case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
-      case CarbonDataType.TIMESTAMP => TimestampType
-    }
-  }
-
-  def updateDataType(
-      currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
-    currentDataType match {
-      case decimal: DecimalType =>
-        val scale = currentDataType.asInstanceOf[DecimalType].scale
-        DecimalType(DecimalType.MAX_PRECISION, scale)
-      case _ =>
-        currentDataType
-    }
-  }
-
-  case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
-  object CarbonSparkUtil {
-
-    def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
-      val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-                           .asScala.map(x => x.getColName) // wf : may be problem
-      val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-                        .asScala.map(x => x.getColName)
-      val dictionary =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
-        (f.getColName.toLowerCase,
-          f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-          !CarbonUtil.hasComplexDataType(f.getDataType))
-      }
-      CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
-    }
-  }
-
-  def getKettleHome(sqlContext: SQLContext): String = {
-    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-    if (null == kettleHomePath) {
-      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-    }
-    if (null == kettleHomePath) {
-      val carbonHome = System.getenv("CARBON_HOME")
-      if (null != carbonHome) {
-        kettleHomePath = carbonHome + "/processing/carbonplugins"
-      }
-    }
-    if (kettleHomePath != null) {
-      val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
-      // get spark master, if local, need to correct the kettle home
-      // e.g: --master local, the executor running in local machine
-      if (sparkMaster.startsWith("local")) {
-        val kettleHomeFileType = FileFactory.getFileType(kettleHomePath)
-        val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType)
-        // check if carbon.kettle.home path is exists
-        if (!kettleHomeFile.exists()) {
-          // get the path of this class
-          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-
-          // xxx.jar!/org/carbondata/spark/rdd/
-          var jarFilePath = this.getClass.getResource("").getPath
-          val endIndex = jarFilePath.indexOf(".jar!") + 4
-          // get the jar file path
-          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar
-          jarFilePath = jarFilePath.substring(0, endIndex)
-          val jarFileType = FileFactory.getFileType(jarFilePath)
-          val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType)
-          // get the parent folder of the jar file
-          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib
-          val carbonLibPath = jarFile.getParentFile.getPath
-          // find the kettle home under the previous folder
-          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins
-          kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME
-          val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-          logger.error(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath")
-          val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath)
-          val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType)
-          // check if the found kettle home exists
-          if (!newKettleHomeFile.exists()) {
-            sys.error("Kettle home not found. Failed to reset carbon.kettle.home")
-          }
-        }
-      }
-    } else {
-      sys.error("carbon.kettle.home is not set")
-    }
-    kettleHomePath
-  }
-
-  def getString(value: Any,
-      serializationNullFormat: String,
-      delimiterLevel1: String,
-      delimiterLevel2: String,
-      format: SimpleDateFormat,
-      level: Int = 1): String = {
-    if (value == null) {
-      serializationNullFormat
-    } else {
-      value match {
-        case s: String => s
-        case d: java.math.BigDecimal => d.toPlainString
-        case i: java.lang.Integer => i.toString
-        case d: java.lang.Double => d.toString
-        case t: java.sql.Timestamp => format format t
-        case d: java.sql.Date => format format d
-        case b: java.lang.Boolean => b.toString
-        case s: java.lang.Short => s.toString
-        case f: java.lang.Float => f.toString
-        case bs: Array[Byte] => new String(bs)
-        case s: scala.collection.Seq[Any] =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          s.foreach { x =>
-            builder.append(getString(x, serializationNullFormat, delimiterLevel1,
-                delimiterLevel2, format, level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case m: scala.collection.Map[Any, Any] =>
-          throw new Exception("Unsupported data type: Map")
-        case r: org.apache.spark.sql.Row =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          for (i <- 0 until r.length) {
-            builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
-                delimiterLevel2, format, level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case other => other.toString
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
deleted file mode 100644
index d5051cf..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.util
-
-import java.util
-import java.util.UUID
-
-import scala.collection.mutable.Map
-
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-object CommonUtil {
-  def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
-      msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
-    val colGrpCols = colGroup.split(',').map(_.trim)
-    colGrpCols.foreach { x =>
-      // if column is no dictionary
-      if (noDictionaryDims.contains(x)) {
-        throw new MalformedCarbonCommandException(
-          "Column group is not supported for no dictionary columns:" + x)
-      } else if (msrs.exists(msr => msr.column.equals(x))) {
-        // if column is measure
-        throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
-      } else if (foundIndExistingColGrp(x)) {
-        throw new MalformedCarbonCommandException("Column is available in other column group:" + x)
-      } else if (isComplex(x, dims)) {
-        throw new MalformedCarbonCommandException(
-          "Column group doesn't support Complex column:" + x)
-      } else if (isTimeStampColumn(x, dims)) {
-        throw new MalformedCarbonCommandException(
-          "Column group doesn't support Timestamp datatype:" + x)
-      }// if invalid column is
-      else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
-        // present
-        throw new MalformedCarbonCommandException(
-          "column in column group is not a valid column: " + x
-        )
-      }
-    }
-    // check if given column is present in other groups
-    def foundIndExistingColGrp(colName: String): Boolean = {
-      retrievedColGrps.foreach { colGrp =>
-        if (colGrp.split(",").contains(colName)) {
-          return true
-        }
-      }
-      false
-    }
-
-  }
-
-
-  def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
-    dims.foreach { dim =>
-      if (dim.column.equalsIgnoreCase(colName)) {
-        if (dim.dataType.isDefined && null != dim.dataType.get &&
-            "timestamp".equalsIgnoreCase(dim.dataType.get)) {
-          return true
-        }
-      }
-    }
-    false
-  }
-
-  def isComplex(colName: String, dims: Seq[Field]): Boolean = {
-    dims.foreach { x =>
-      if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) {
-        val children = x.children.get
-        if (x.column.equals(colName)) {
-          return true
-        } else {
-          children.foreach { child =>
-            val fieldName = x.column + "." + child.column
-            if (fieldName.equalsIgnoreCase(colName)) {
-              return true
-            }
-          }
-        }
-      }
-    }
-    false
-  }
-
-  def getColumnProperties(column: String,
-      tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = {
-    val fieldProps = new util.ArrayList[ColumnProperty]()
-    val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
-    tableProperties.foreach {
-      case (key, value) =>
-        if (key.startsWith(columnPropertiesStartKey)) {
-          fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(),
-            key.length()), value))
-        }
-    }
-    if (fieldProps.isEmpty) {
-      None
-    } else {
-      Some(fieldProps)
-    }
-  }
-
-  def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = {
-    val itr = tableProperties.keys
-    var isValid: Boolean = true
-    tableProperties.foreach {
-      case (key, value) =>
-        if (!validateFields(key, fields)) {
-          isValid = false
-          throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }")
-        }
-    }
-    isValid
-  }
-
-  def validateFields(key: String, fields: Seq[Field]): Boolean = {
-    var isValid: Boolean = false
-    fields.foreach { field =>
-      if (field.children.isDefined && field.children.get != null) {
-        field.children.foreach(fields => {
-          fields.foreach(complexfield => {
-            val column = if ("val" == complexfield.column) {
-              field.column
-            } else {
-              field.column + "." + complexfield.column
-            }
-            if (validateColumnProperty(key, column)) {
-              isValid = true
-            }
-          }
-          )
-        }
-        )
-      } else {
-        if (validateColumnProperty(key, field.column)) {
-          isValid = true
-        }
-      }
-
-    }
-    isValid
-  }
-
-  def validateColumnProperty(key: String, column: String): Boolean = {
-    if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) {
-      return true
-    }
-    val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
-    if (key.startsWith(columnPropertyKey)) {
-      true
-    } else {
-      false
-    }
-  }
-
-  /**
-   * @param colGrps
-   * @param dims
-   * @return columns of column groups in schema order
-   */
-  def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = {
-    def sortByIndex(colGrp1: String, colGrp2: String) = {
-      val firstCol1 = colGrp1.split(",")(0)
-      val firstCol2 = colGrp2.split(",")(0)
-      val dimIndex1: Int = getDimIndex(firstCol1, dims)
-      val dimIndex2: Int = getDimIndex(firstCol2, dims)
-      dimIndex1 < dimIndex2
-    }
-    val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex)
-    sortedColGroups
-  }
-
-  /**
-   * @param colName
-   * @param dims
-   * @return return index for given column in dims
-   */
-  def getDimIndex(colName: String, dims: Seq[Field]): Int = {
-    var index: Int = -1
-    dims.zipWithIndex.foreach { h =>
-      if (h._1.column.equalsIgnoreCase(colName)) {
-        index = h._2.toInt
-      }
-    }
-    index
-  }
-
-  /**
-   * This method will validate the table block size specified by the user
-   *
-   * @param tableProperties
-   */
-  def validateTableBlockSize(tableProperties: Map[String, String]): Unit = {
-    var tableBlockSize: Integer = 0
-    if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
-      val blockSizeStr: String =
-        parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
-      try {
-        tableBlockSize = Integer.parseInt(blockSizeStr)
-      } catch {
-        case e: NumberFormatException =>
-          throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
-                                                    s"$blockSizeStr, only int value from 1 MB to " +
-                                                    s"2048 MB is supported.")
-      }
-      if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL ||
-          tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) {
-        throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
-                                                  s"$blockSizeStr, only int value from 1 MB to " +
-                                                  s"2048 MB is supported.")
-      }
-      tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr)
-    }
-  }
-
-  /**
-   * This method will parse the configure string from 'XX MB/M' to 'XX'
-   *
-   * @param propertyValueString
-   */
-  def parsePropertyValueStringInMB(propertyValueString: String): String = {
-    var parsedPropertyValueString: String = propertyValueString
-    if (propertyValueString.trim.toLowerCase.endsWith("mb")) {
-      parsedPropertyValueString = propertyValueString.trim.toLowerCase
-        .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("mb")).trim
-    }
-    if (propertyValueString.trim.toLowerCase.endsWith("m")) {
-      parsedPropertyValueString = propertyValueString.trim.toLowerCase
-        .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("m")).trim
-    }
-    parsedPropertyValueString
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
deleted file mode 100644
index aa8fcd5..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-
-object DataTypeConverterUtil {
-  def convertToCarbonType(dataType: String): DataType = {
-    dataType.toLowerCase match {
-      case "string" => DataType.STRING
-      case "int" => DataType.INT
-      case "integer" => DataType.INT
-      case "tinyint" => DataType.SHORT
-      case "short" => DataType.SHORT
-      case "long" => DataType.LONG
-      case "bigint" => DataType.LONG
-      case "numeric" => DataType.DOUBLE
-      case "double" => DataType.DOUBLE
-      case "decimal" => DataType.DECIMAL
-      case "timestamp" => DataType.TIMESTAMP
-      case "array" => DataType.ARRAY
-      case "struct" => DataType.STRUCT
-      case _ => sys.error(s"Unsupported data type: $dataType")
-    }
-  }
-
-  def convertToString(dataType: DataType): String = {
-    dataType match {
-      case DataType.STRING => "string"
-      case DataType.SHORT => "smallint"
-      case DataType.INT => "int"
-      case DataType.LONG => "bigint"
-      case DataType.DOUBLE => "double"
-      case DataType.DECIMAL => "decimal"
-      case DataType.TIMESTAMP => "timestamp"
-      case DataType.ARRAY => "array"
-      case DataType.STRUCT => "struct"
-    }
-  }
-}