You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:47 UTC

[09/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..af349a8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet}
+import scala.collection.mutable
+
+import org.apache.spark.Partition
+import org.apache.spark.scheduler.TaskLocation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * DataLoadPartitionCoalescer
+ * Repartition the partitions of rdd to few partitions, one partition per node.
+ * exmaple:
+ * blk_hst  host1 host2 host3 host4 host5
+ * block1   host1 host2 host3
+ * block2         host2       host4 host5
+ * block3               host3 host4 host5
+ * block4   host1 host2       host4
+ * block5   host1       host3 host4
+ * block6   host1 host2             host5
+ * -------------------------------------------------------
+ * 1. sort host by number of blocks
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block3 block6
+ * host1: block1 block4 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 2. sort blocks of each host1
+ * new partitions are before old partitions
+ * -------------------------------------------------------
+ * host3:                      block1 block3 block5
+ * host5:        block2 block6+block3
+ * host1: block4+block1 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 3. assign blocks to host
+ * -------------------------------------------------------
+ * step1: host3 choose block1, remove from host1, host2
+ * step2: host5 choose block2, remove from host2, host4
+ * step3: host1 choose block4, .....
+ * -------------------------------------------------------
+ * result:
+ * host3:                      block1       block5
+ * host5:        block2
+ * host1: block4
+ * host2:                      block6
+ * host4:        block3
+ */
+class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val prevPartitions = prev.partitions
+  var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
+  // host => partition id list
+  val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
+  // partition id => host list
+  val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
+  val noLocalityPartitions = new ArrayBuffer[Int]
+  var noLocality = true
+  /**
+   * assign a task location for a partition
+   */
+  private def getLocation(index: Int): Option[String] = {
+    if (index < nodeList.length) {
+      Some(nodeList(index))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * collect partitions to each node
+   */
+  private def groupByNode(): Unit = {
+    // initialize hostMapPartitionIds
+    nodeList.foreach { node =>
+      val map = new LinkedHashSet[Int]
+      hostMapPartitionIds.put(node, map)
+    }
+    // collect partitions for each node
+    val tmpNoLocalityPartitions = new ArrayBuffer[Int]
+    prevPartitions.foreach { p =>
+      val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
+      if (locs.isEmpty) {
+        // if a partition has no location, add to noLocalityPartitions
+        tmpNoLocalityPartitions += p.index
+      } else {
+        // add partion to hostMapPartitionIds and partitionIdMapHosts
+        locs.foreach { loc =>
+          val host = loc.host
+          hostMapPartitionIds.get(host) match {
+            // if the location of the partition is not in node list,
+            // will add this partition to noLocalityPartitions
+            case None => tmpNoLocalityPartitions += p.index
+            case Some(ids) =>
+              noLocality = false
+              ids += p.index
+              partitionIdMapHosts.get(p.index) match {
+                case None =>
+                  val hosts = new ArrayBuffer[String]
+                  hosts += host
+                  partitionIdMapHosts.put(p.index, hosts)
+                case Some(hosts) =>
+                  hosts += host
+              }
+          }
+        }
+      }
+    }
+
+    // remove locality partition
+    tmpNoLocalityPartitions.distinct.foreach {index =>
+      partitionIdMapHosts.get(index) match {
+        case None => noLocalityPartitions += index
+        case Some(_) =>
+      }
+    }
+  }
+
+  /**
+   * sort host and partitions
+   */
+  private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
+    val oldPartitionIdSet = new HashSet[Int]
+    // sort host by number of partitions
+    hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
+      // order: newPartitionIds + oldPartitionIds
+      val sortedPartitionIdSet = new LinkedHashSet[Int]
+      var newPartitionIds = new ArrayBuffer[Int]
+      var oldPartitionIds = new ArrayBuffer[Int]
+      loc._2.foreach { p =>
+        if (oldPartitionIdSet.contains(p)) {
+          oldPartitionIds += p
+        } else {
+          newPartitionIds += p
+          oldPartitionIdSet.add(p)
+        }
+      }
+      // sort and add new partitions
+      newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+      // sort and add old partitions
+      oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+      // update hostMapPartitionIds
+      hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
+      (loc._1, sortedPartitionIdSet)
+    }.toArray
+  }
+
+  /**
+   *  assign locality partition to each host
+   */
+  private def assignPartitonNodeLocality(
+      noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
+    val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
+    for (i <- 0 until localityResult.length) {
+      localityResult(i) = new ArrayBuffer[Int]
+    }
+    val noEmptyHostSet = new HashSet[String]
+    noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
+
+    var hostIndex = 0
+    while (noEmptyHostSet.nonEmpty) {
+      val hostEntry = noEmptyHosts(hostIndex)
+      if (noEmptyHostSet.contains(hostEntry._1)) {
+        if (hostEntry._2.nonEmpty) {
+          var partitionId = hostEntry._2.iterator.next
+          localityResult(hostIndex) += partitionId
+          // remove from sortedParts
+          partitionIdMapHosts.get(partitionId) match {
+            case Some(locs) =>
+              locs.foreach { loc =>
+                hostMapPartitionIds.get(loc) match {
+                  case Some(parts) =>
+                    parts.remove(partitionId)
+                }
+              }
+          }
+        } else {
+          noEmptyHostSet.remove(hostEntry._1)
+        }
+      }
+
+      hostIndex = hostIndex + 1
+      if (hostIndex == noEmptyHosts.length) {
+        hostIndex = 0
+      }
+    }
+    localityResult
+  }
+
+  /**
+   * assign no locality partitions to each host
+   */
+  private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
+      noEmptyHosts: mutable.Buffer[String],
+      localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
+    val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
+    LOGGER.info(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
+    val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
+    for (i <- 0 until noLocalityResult.length) {
+      noLocalityResult(i) = new ArrayBuffer[Int]
+    }
+    var noLocalityPartitionIndex = 0
+    if (noLocalityPartitions.nonEmpty) {
+      if (emptyHosts.nonEmpty) {
+        // at first, assign avg number to empty node
+        for (i <- 0 until avgNumber) {
+          noLocalityResult.foreach { partitionIds =>
+            if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+              partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
+              noLocalityPartitionIndex = noLocalityPartitionIndex + 1
+            }
+          }
+        }
+      }
+      // still have no locality partitions
+      // assign to all hosts
+      if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+        var partIndex = 0
+        for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
+          if (partIndex < localityResult.length) {
+            localityResult(partIndex) += noLocalityPartitions(i)
+          } else {
+            noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
+          }
+          partIndex = partIndex + 1
+          if (partIndex == localityResult.length + noLocalityResult.length) {
+            partIndex = 0
+          }
+        }
+      }
+    }
+    noLocalityResult
+  }
+
+  /**
+   * no locality repartition
+   */
+  private def repartitionNoLocality(): Array[Partition] = {
+    // no locality repartition
+    LOGGER.info("no locality partition")
+    val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
+    for (i <- 0 until numOfParts) {
+      prevPartIndexs(i) = new ArrayBuffer[Int]
+    }
+    for (i <- 0 until prevPartitions.length) {
+      prevPartIndexs(i % numOfParts) += prevPartitions(i).index
+    }
+    prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
+      new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
+    }
+  }
+
+  private def repartitionLocality(): Array[Partition] = {
+    LOGGER.info("locality partition")
+    val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
+    // empty host seq
+    val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
+    // non empty host array
+    var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
+
+    // 1. do locality repartition
+    // sort host and partitions
+    tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
+    // assign locality partition to non empty hosts
+    val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
+    // collect non empty hosts and empty hosts
+    val noEmptyHosts = mutable.Buffer[String]()
+    val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
+    for(index <- 0 until templocalityResult.size) {
+      if (templocalityResult(index).isEmpty) {
+        emptyHosts += tempNoEmptyHosts(index)._1
+      } else {
+        noEmptyHosts += tempNoEmptyHosts(index)._1
+        localityResult += templocalityResult(index)
+      }
+    }
+    // 2. do no locality repartition
+    // assign no locality partitions to all hosts
+    val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
+
+    // 3. generate CoalescedRDDPartition
+    (0 until localityResult.length + noLocalityResult.length).map { index =>
+      val ids = if (index < localityResult.length) {
+        localityResult(index).toArray
+      } else {
+        noLocalityResult(index - localityResult.length).toArray
+      }
+      val loc = if (index < localityResult.length) {
+        Some(noEmptyHosts(index))
+      } else {
+        Some(emptyHosts(index - localityResult.length))
+      }
+      LOGGER.info(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
+      new CoalescedRDDPartition(index, prev, ids, loc)
+    }.filter(_.parentsIndices.nonEmpty).toArray
+
+  }
+
+  def run(): Array[Partition] = {
+    // 1. group partitions by node
+    groupByNode()
+    LOGGER.info(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
+    val partitions = if (noLocality) {
+      // 2.A no locality partition
+      repartitionNoLocality()
+    } else {
+      // 2.B locality partition
+      repartitionLocality()
+    }
+    DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
+    partitions
+  }
+}
+
+object DataLoadPartitionCoalescer {
+  def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
+    prev.context.getPreferredLocs(prev, p.index)
+  }
+
+  def getParentsIndices(p: Partition): Array[Int] = {
+    p.asInstanceOf[CoalescedRDDPartition].parentsIndices
+  }
+
+  def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+    val prevPartIds = new ArrayBuffer[Int]
+    parts.foreach{ p =>
+      prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
+    }
+    // all partitions must be arranged once.
+    assert(prevPartIds.size == prevParts.size)
+    val prevPartIdsMap = prevPartIds.map{ id =>
+      (id, id)
+    }.toMap
+    prevParts.foreach{ p =>
+      prevPartIdsMap.get(p.index) match {
+        case None => assert(false, "partition " + p.index + " not found")
+        case Some(_) =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
new file mode 100644
index 0000000..e23b58d
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.tasks
+
+import java.io.IOException
+
+import scala.collection.mutable
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.spark.rdd.DictionaryLoadModel
+
+/**
+ *
+ * @param valuesBuffer
+ * @param dictionary
+ * @param model
+ * @param columnIndex
+ * @param writer
+ */
+class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
+    dictionary: Dictionary,
+    model: DictionaryLoadModel, columnIndex: Int,
+    var writer: CarbonDictionaryWriter = null) {
+
+  /**
+   * execute the task
+   *
+   * @return distinctValueList and time taken to write
+   */
+  def execute(): java.util.List[String] = {
+    val values = valuesBuffer.toArray
+    java.util.Arrays.sort(values, Ordering[String])
+    val dictService = CarbonCommonFactory.getDictionaryService
+    writer = dictService.getDictionaryWriter(
+      model.table,
+      model.columnIdentifier(columnIndex),
+      model.hdfsLocation)
+    val distinctValues: java.util.List[String] = new java.util.ArrayList()
+
+    try {
+      if (!model.dictFileExists(columnIndex)) {
+        writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+        distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+      }
+
+      if (values.length >= 1) {
+        if (model.dictFileExists(columnIndex)) {
+          for (value <- values) {
+            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
+                model.primDimensions(columnIndex))
+            if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
+              CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+              writer.write(parsedValue)
+              distinctValues.add(parsedValue)
+            }
+          }
+
+        } else {
+          for (value <- values) {
+            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
+                model.primDimensions(columnIndex))
+            if (null != parsedValue) {
+              writer.write(parsedValue)
+              distinctValues.add(parsedValue)
+            }
+          }
+        }
+      }
+    } catch {
+      case ex: IOException =>
+        throw ex
+    } finally {
+      if (null != writer) {
+        writer.close()
+      }
+    }
+    distinctValues
+  }
+
+  /**
+   * update dictionary metadata
+   */
+  def updateMetaData() {
+    if (null != writer) {
+      writer.commit()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
new file mode 100644
index 0000000..d552331
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.tasks
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.spark.rdd.DictionaryLoadModel
+
+/**
+ * This task writes sort index file
+ *
+ * @param model
+ * @param index
+ * @param dictionary
+ * @param distinctValues
+ * @param carbonDictionarySortIndexWriter
+ */
+class SortIndexWriterTask(model: DictionaryLoadModel,
+    index: Int,
+    dictionary: Dictionary,
+    distinctValues: java.util.List[String],
+    var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) {
+  def execute() {
+    try {
+      if (distinctValues.size() > 0) {
+        val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+        val dictService = CarbonCommonFactory.getDictionaryService
+        val dictionarySortInfo: CarbonDictionarySortInfo =
+          preparator.getDictionarySortInfo(distinctValues, dictionary,
+            model.primDimensions(index).getDataType)
+        carbonDictionarySortIndexWriter =
+          dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
+            model.hdfsLocation)
+        carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+        carbonDictionarySortIndexWriter
+          .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
+      }
+    } finally {
+      if (null != carbonDictionarySortIndexWriter) {
+        carbonDictionarySortIndexWriter.close()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
new file mode 100644
index 0000000..dc63186
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonDataType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonScalaUtil {
+  def convertSparkToCarbonDataType(
+      dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
+    dataType match {
+      case StringType => CarbonDataType.STRING
+      case ShortType => CarbonDataType.SHORT
+      case IntegerType => CarbonDataType.INT
+      case LongType => CarbonDataType.LONG
+      case DoubleType => CarbonDataType.DOUBLE
+      case FloatType => CarbonDataType.FLOAT
+      case DateType => CarbonDataType.DATE
+      case BooleanType => CarbonDataType.BOOLEAN
+      case TimestampType => CarbonDataType.TIMESTAMP
+      case ArrayType(_, _) => CarbonDataType.ARRAY
+      case StructType(_) => CarbonDataType.STRUCT
+      case NullType => CarbonDataType.NULL
+      case _ => CarbonDataType.DECIMAL
+    }
+  }
+
+  def convertSparkToCarbonSchemaDataType(dataType: String): String = {
+    dataType match {
+      case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING
+      case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER
+      case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER
+      case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT
+      case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC
+      case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC
+      case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC
+      case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC
+      case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING
+      case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING
+      case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP
+      case anyType => anyType
+    }
+  }
+
+  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+    dataType match {
+      case CarbonDataType.STRING => StringType
+      case CarbonDataType.SHORT => ShortType
+      case CarbonDataType.INT => IntegerType
+      case CarbonDataType.LONG => LongType
+      case CarbonDataType.DOUBLE => DoubleType
+      case CarbonDataType.BOOLEAN => BooleanType
+      case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
+      case CarbonDataType.TIMESTAMP => TimestampType
+    }
+  }
+
+  def updateDataType(
+      currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
+    currentDataType match {
+      case decimal: DecimalType =>
+        val scale = currentDataType.asInstanceOf[DecimalType].scale
+        DecimalType(DecimalType.MAX_PRECISION, scale)
+      case _ =>
+        currentDataType
+    }
+  }
+
+  def getKettleHome(sqlContext: SQLContext): String = {
+    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
+    if (null == kettleHomePath) {
+      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
+    }
+    if (null == kettleHomePath) {
+      val carbonHome = System.getenv("CARBON_HOME")
+      if (null != carbonHome) {
+        kettleHomePath = carbonHome + "/processing/carbonplugins"
+      }
+    }
+    if (kettleHomePath != null) {
+      val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
+      // get spark master, if local, need to correct the kettle home
+      // e.g: --master local, the executor running in local machine
+      if (sparkMaster.startsWith("local")) {
+        val kettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+        val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType)
+        // check if carbon.kettle.home path is exists
+        if (!kettleHomeFile.exists()) {
+          // get the path of this class
+          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-
+          // xxx.jar!/org/carbondata/spark/rdd/
+          var jarFilePath = this.getClass.getResource("").getPath
+          val endIndex = jarFilePath.indexOf(".jar!") + 4
+          // get the jar file path
+          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar
+          jarFilePath = jarFilePath.substring(0, endIndex)
+          val jarFileType = FileFactory.getFileType(jarFilePath)
+          val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType)
+          // get the parent folder of the jar file
+          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib
+          val carbonLibPath = jarFile.getParentFile.getPath
+          // find the kettle home under the previous folder
+          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins
+          kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME
+          val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          logger.error(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath")
+          val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+          val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType)
+          // check if the found kettle home exists
+          if (!newKettleHomeFile.exists()) {
+            sys.error("Kettle home not found. Failed to reset carbon.kettle.home")
+          }
+        }
+      }
+    } else {
+      sys.error("carbon.kettle.home is not set")
+    }
+    kettleHomePath
+  }
+
+  def getString(value: Any,
+      serializationNullFormat: String,
+      delimiterLevel1: String,
+      delimiterLevel2: String,
+      format: SimpleDateFormat,
+      level: Int = 1): String = {
+    if (value == null) {
+      serializationNullFormat
+    } else {
+      value match {
+        case s: String => s
+        case d: java.math.BigDecimal => d.toPlainString
+        case i: java.lang.Integer => i.toString
+        case d: java.lang.Double => d.toString
+        case t: java.sql.Timestamp => format format t
+        case d: java.sql.Date => format format d
+        case b: java.lang.Boolean => b.toString
+        case s: java.lang.Short => s.toString
+        case f: java.lang.Float => f.toString
+        case bs: Array[Byte] => new String(bs)
+        case s: scala.collection.Seq[Any] =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          s.foreach { x =>
+            builder.append(getString(x, serializationNullFormat, delimiterLevel1,
+                delimiterLevel2, format, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case m: scala.collection.Map[Any, Any] =>
+          throw new Exception("Unsupported data type: Map")
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          for (i <- 0 until r.length) {
+            builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
+                delimiterLevel2, format, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case other => other.toString
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
new file mode 100644
index 0000000..1c9d774
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.execution.command.ColumnProperty
+import org.apache.spark.sql.execution.command.Field
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+object CommonUtil {
+  def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
+      msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
+    val colGrpCols = colGroup.split(',').map(_.trim)
+    colGrpCols.foreach { x =>
+      // if column is no dictionary
+      if (noDictionaryDims.contains(x)) {
+        throw new MalformedCarbonCommandException(
+          "Column group is not supported for no dictionary columns:" + x)
+      } else if (msrs.exists(msr => msr.column.equals(x))) {
+        // if column is measure
+        throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
+      } else if (foundIndExistingColGrp(x)) {
+        throw new MalformedCarbonCommandException("Column is available in other column group:" + x)
+      } else if (isComplex(x, dims)) {
+        throw new MalformedCarbonCommandException(
+          "Column group doesn't support Complex column:" + x)
+      } else if (isTimeStampColumn(x, dims)) {
+        throw new MalformedCarbonCommandException(
+          "Column group doesn't support Timestamp datatype:" + x)
+      }// if invalid column is
+      else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
+        // present
+        throw new MalformedCarbonCommandException(
+          "column in column group is not a valid column: " + x
+        )
+      }
+    }
+    // check if given column is present in other groups
+    def foundIndExistingColGrp(colName: String): Boolean = {
+      retrievedColGrps.foreach { colGrp =>
+        if (colGrp.split(",").contains(colName)) {
+          return true
+        }
+      }
+      false
+    }
+
+  }
+
+
+  def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
+    dims.foreach { dim =>
+      if (dim.column.equalsIgnoreCase(colName)) {
+        if (dim.dataType.isDefined && null != dim.dataType.get &&
+            "timestamp".equalsIgnoreCase(dim.dataType.get)) {
+          return true
+        }
+      }
+    }
+    false
+  }
+
+  def isComplex(colName: String, dims: Seq[Field]): Boolean = {
+    dims.foreach { x =>
+      if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) {
+        val children = x.children.get
+        if (x.column.equals(colName)) {
+          return true
+        } else {
+          children.foreach { child =>
+            val fieldName = x.column + "." + child.column
+            if (fieldName.equalsIgnoreCase(colName)) {
+              return true
+            }
+          }
+        }
+      }
+    }
+    false
+  }
+
+  def getColumnProperties(column: String,
+      tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = {
+    val fieldProps = new util.ArrayList[ColumnProperty]()
+    val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+    tableProperties.foreach {
+      case (key, value) =>
+        if (key.startsWith(columnPropertiesStartKey)) {
+          fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(),
+            key.length()), value))
+        }
+    }
+    if (fieldProps.isEmpty) {
+      None
+    } else {
+      Some(fieldProps)
+    }
+  }
+
+  def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = {
+    val itr = tableProperties.keys
+    var isValid: Boolean = true
+    tableProperties.foreach {
+      case (key, value) =>
+        if (!validateFields(key, fields)) {
+          isValid = false
+          throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }")
+        }
+    }
+    isValid
+  }
+
+  def validateFields(key: String, fields: Seq[Field]): Boolean = {
+    var isValid: Boolean = false
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        field.children.foreach(fields => {
+          fields.foreach(complexfield => {
+            val column = if ("val" == complexfield.column) {
+              field.column
+            } else {
+              field.column + "." + complexfield.column
+            }
+            if (validateColumnProperty(key, column)) {
+              isValid = true
+            }
+          }
+          )
+        }
+        )
+      } else {
+        if (validateColumnProperty(key, field.column)) {
+          isValid = true
+        }
+      }
+
+    }
+    isValid
+  }
+
+  def validateColumnProperty(key: String, column: String): Boolean = {
+    if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) {
+      return true
+    }
+    val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+    if (key.startsWith(columnPropertyKey)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * @param colGrps
+   * @param dims
+   * @return columns of column groups in schema order
+   */
+  def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = {
+    def sortByIndex(colGrp1: String, colGrp2: String) = {
+      val firstCol1 = colGrp1.split(",")(0)
+      val firstCol2 = colGrp2.split(",")(0)
+      val dimIndex1: Int = getDimIndex(firstCol1, dims)
+      val dimIndex2: Int = getDimIndex(firstCol2, dims)
+      dimIndex1 < dimIndex2
+    }
+    val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex)
+    sortedColGroups
+  }
+
+  /**
+   * @param colName
+   * @param dims
+   * @return return index for given column in dims
+   */
+  def getDimIndex(colName: String, dims: Seq[Field]): Int = {
+    var index: Int = -1
+    dims.zipWithIndex.foreach { h =>
+      if (h._1.column.equalsIgnoreCase(colName)) {
+        index = h._2.toInt
+      }
+    }
+    index
+  }
+
+  /**
+   * This method will validate the table block size specified by the user
+   *
+   * @param tableProperties
+   */
+  def validateTableBlockSize(tableProperties: Map[String, String]): Unit = {
+    var tableBlockSize: Integer = 0
+    if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
+      val blockSizeStr: String =
+        parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
+      try {
+        tableBlockSize = Integer.parseInt(blockSizeStr)
+      } catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
+                                                    s"$blockSizeStr, only int value from 1 MB to " +
+                                                    s"2048 MB is supported.")
+      }
+      if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL ||
+          tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) {
+        throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
+                                                  s"$blockSizeStr, only int value from 1 MB to " +
+                                                  s"2048 MB is supported.")
+      }
+      tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr)
+    }
+  }
+
+  /**
+   * This method will parse the configure string from 'XX MB/M' to 'XX'
+   *
+   * @param propertyValueString
+   */
+  def parsePropertyValueStringInMB(propertyValueString: String): String = {
+    var parsedPropertyValueString: String = propertyValueString
+    if (propertyValueString.trim.toLowerCase.endsWith("mb")) {
+      parsedPropertyValueString = propertyValueString.trim.toLowerCase
+        .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("mb")).trim
+    }
+    if (propertyValueString.trim.toLowerCase.endsWith("m")) {
+      parsedPropertyValueString = propertyValueString.trim.toLowerCase
+        .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("m")).trim
+    }
+    parsedPropertyValueString
+  }
+
+  def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
+    val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
+    val details = SegmentStatusManager.readLoadMetadata(metadataPath)
+    model.setLoadMetadataDetails(details.toList.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
new file mode 100644
index 0000000..5ec96df
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+
+object DataTypeConverterUtil {
+  def convertToCarbonType(dataType: String): DataType = {
+    dataType.toLowerCase match {
+      case "string" => DataType.STRING
+      case "int" => DataType.INT
+      case "integer" => DataType.INT
+      case "tinyint" => DataType.SHORT
+      case "short" => DataType.SHORT
+      case "long" => DataType.LONG
+      case "bigint" => DataType.LONG
+      case "numeric" => DataType.DOUBLE
+      case "double" => DataType.DOUBLE
+      case "decimal" => DataType.DECIMAL
+      case "timestamp" => DataType.TIMESTAMP
+      case "array" => DataType.ARRAY
+      case "struct" => DataType.STRUCT
+      case _ => convertToCarbonTypeForSpark2(dataType)
+    }
+  }
+
+  def convertToCarbonTypeForSpark2(dataType: String): DataType = {
+    dataType.toLowerCase match {
+      case "stringtype" => DataType.STRING
+      case "inttype" => DataType.INT
+      case "integertype" => DataType.INT
+      case "tinyinttype" => DataType.SHORT
+      case "shorttype" => DataType.SHORT
+      case "longtype" => DataType.LONG
+      case "biginttype" => DataType.LONG
+      case "numerictype" => DataType.DOUBLE
+      case "doubletype" => DataType.DOUBLE
+      case "decimaltype" => DataType.DECIMAL
+      case "timestamptype" => DataType.TIMESTAMP
+      case "arraytype" => DataType.ARRAY
+      case "structtype" => DataType.STRUCT
+      case _ => sys.error(s"Unsupported data type: $dataType")
+    }
+  }
+
+  def convertToString(dataType: DataType): String = {
+    dataType match {
+      case DataType.STRING => "string"
+      case DataType.SHORT => "smallint"
+      case DataType.INT => "int"
+      case DataType.LONG => "bigint"
+      case DataType.DOUBLE => "double"
+      case DataType.DECIMAL => "decimal"
+      case DataType.TIMESTAMP => "timestamp"
+      case DataType.ARRAY => "array"
+      case DataType.STRUCT => "struct"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
new file mode 100644
index 0000000..e650bfe
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.io.{FileNotFoundException, IOException}
+import java.nio.charset.Charset
+import java.util.regex.Pattern
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.language.implicitConversions
+import scala.util.control.Breaks.{break, breakable}
+
+import org.apache.commons.lang3.{ArrayUtils, StringUtils}
+import org.apache.spark.Accumulator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.util.FileUtils
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.reader.CarbonDictionaryReader
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.CarbonSparkFactory
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd._
+
+/**
+ * A object which provide a method to generate global dictionary from CSV files.
+ */
+object GlobalDictionaryUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The default separator to use if none is supplied to the constructor.
+   */
+  val DEFAULT_SEPARATOR: Char = ','
+  /**
+   * The default quote character to use if none is supplied to the
+   * constructor.
+   */
+  val DEFAULT_QUOTE_CHARACTER: Char = '"'
+
+  /**
+   * find columns which need to generate global dictionary.
+   *
+   * @param dimensions dimension list of schema
+   * @param headers    column headers
+   * @param columns    column list of csv file
+   */
+  def pruneDimensions(dimensions: Array[CarbonDimension],
+      headers: Array[String],
+      columns: Array[String]): (Array[CarbonDimension], Array[String]) = {
+    val dimensionBuffer = new ArrayBuffer[CarbonDimension]
+    val columnNameBuffer = new ArrayBuffer[String]
+    val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY,
+      Encoding.DIRECT_DICTIONARY))
+    dimensionsWithDict.foreach { dim =>
+      breakable {
+        headers.zipWithIndex.foreach { h =>
+          if (dim.getColName.equalsIgnoreCase(h._1)) {
+            dimensionBuffer += dim
+            columnNameBuffer += columns(h._2)
+            break
+          }
+        }
+      }
+    }
+    (dimensionBuffer.toArray, columnNameBuffer.toArray)
+  }
+
+  /**
+   * use this method to judge whether CarbonDimension use some encoding or not
+   *
+   * @param dimension       carbonDimension
+   * @param encoding        the coding way of dimension
+   * @param excludeEncoding the coding way to exclude
+   */
+  def hasEncoding(dimension: CarbonDimension,
+      encoding: Encoding,
+      excludeEncoding: Encoding): Boolean = {
+    if (dimension.isComplex()) {
+      val children = dimension.getListOfChildDimensions
+      children.asScala.exists(hasEncoding(_, encoding, excludeEncoding))
+    } else {
+      dimension.hasEncoding(encoding) &&
+      (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))
+    }
+  }
+
+  def gatherDimensionByEncoding(carbonLoadModel: CarbonLoadModel,
+      dimension: CarbonDimension,
+      encoding: Encoding,
+      excludeEncoding: Encoding,
+      dimensionsWithEncoding: ArrayBuffer[CarbonDimension],
+      forPreDefDict: Boolean) {
+    if (dimension.isComplex) {
+      val children = dimension.getListOfChildDimensions.asScala
+      children.foreach { c =>
+        gatherDimensionByEncoding(carbonLoadModel, c, encoding, excludeEncoding,
+          dimensionsWithEncoding, forPreDefDict)
+      }
+    } else {
+      if (dimension.hasEncoding(encoding) &&
+          (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
+        if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
+            (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
+          dimensionsWithEncoding += dimension
+        }
+      }
+    }
+  }
+
+  def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
+      dimension: CarbonDimension,
+      forPreDefDict: Boolean): Array[CarbonDimension] = {
+    val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
+    gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
+      Encoding.DIRECT_DICTIONARY,
+      dimensionsWithDict, forPreDefDict)
+    dimensionsWithDict.toArray
+  }
+
+  /**
+   * invoke CarbonDictionaryWriter to write dictionary to file.
+   *
+   * @param model       instance of DictionaryLoadModel
+   * @param columnIndex the index of current column in column list
+   * @param iter        distinct value list of dictionary
+   */
+  def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
+      columnIndex: Int,
+      iter: Iterator[String]): Unit = {
+    val dictService = CarbonCommonFactory.getDictionaryService
+    val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
+      model.table,
+      model.columnIdentifier(columnIndex),
+      model.hdfsLocation
+    )
+    try {
+      while (iter.hasNext) {
+        writer.write(iter.next)
+      }
+    } finally {
+      writer.close()
+    }
+  }
+
+  /**
+   * read global dictionary from cache
+   */
+  def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = {
+    val dictMap = new HashMap[String, Dictionary]
+    model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m =>
+      val dict = CarbonLoaderUtil.getDictionary(model.table,
+        m._1.getColumnIdentifier, model.hdfsLocation,
+        m._1.getDataType
+      )
+      dictMap.put(m._1.getColumnId, dict)
+    }
+    dictMap
+  }
+
+  /**
+   * invoke CarbonDictionaryReader to read dictionary from files.
+   *
+   * @param model carbon dictionary load model
+   */
+  def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = {
+    val dictMap = new HashMap[String, HashSet[String]]
+    val dictService = CarbonCommonFactory.getDictionaryService
+    for (i <- model.primDimensions.indices) {
+      val set = new HashSet[String]
+      if (model.dictFileExists(i)) {
+        val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table,
+          model.columnIdentifier(i), model.hdfsLocation
+        )
+        val values = reader.read
+        if (values != null) {
+          for (j <- 0 until values.size) {
+            set.add(new String(values.get(j),
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
+          }
+        }
+      }
+      dictMap.put(model.primDimensions(i).getColumnId, set)
+    }
+    dictMap
+  }
+
+  def generateParserForChildrenDimension(dim: CarbonDimension,
+      format: DataFormat,
+      mapColumnValuesWithId:
+      HashMap[String, HashSet[String]],
+      generic: GenericParser): Unit = {
+    val children = dim.getListOfChildDimensions.asScala
+    for (i <- children.indices) {
+      generateParserForDimension(Some(children(i)), format.cloneAndIncreaseIndex,
+        mapColumnValuesWithId) match {
+        case Some(childDim) =>
+          generic.addChild(childDim)
+        case None =>
+      }
+    }
+  }
+
+  def generateParserForDimension(dimension: Option[CarbonDimension],
+      format: DataFormat,
+      mapColumnValuesWithId: HashMap[String, HashSet[String]]): Option[GenericParser] = {
+    dimension match {
+      case None =>
+        None
+      case Some(dim) =>
+        dim.getDataType match {
+          case DataType.ARRAY =>
+            val arrDim = ArrayParser(dim, format)
+            generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim)
+            Some(arrDim)
+          case DataType.STRUCT =>
+            val stuDim = StructParser(dim, format)
+            generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim)
+            Some(stuDim)
+          case _ =>
+            Some(PrimitiveParser(dim, mapColumnValuesWithId.get(dim.getColumnId)))
+        }
+    }
+  }
+
+  def createDataFormat(delimiters: Array[String]): DataFormat = {
+    if (ArrayUtils.isNotEmpty(delimiters)) {
+      val patterns = delimiters.map { d =>
+        Pattern.compile(if (d == null) {
+          ""
+        } else {
+          d
+        })
+      }
+      DataFormat(delimiters, 0, patterns)
+    } else {
+      null
+    }
+  }
+
+  def isHighCardinalityColumn(columnCardinality: Int,
+      rowCount: Long,
+      model: DictionaryLoadModel): Boolean = {
+    (columnCardinality > model.highCardThreshold) &&
+    (rowCount > 0) &&
+    (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
+  }
+
+  /**
+   * create a instance of DictionaryLoadModel
+   *
+   * @param carbonLoadModel carbon load model
+   * @param table           CarbonTableIdentifier
+   * @param dimensions      column list
+   * @param hdfsLocation    store location in HDFS
+   * @param dictfolderPath  path of dictionary folder
+   */
+  def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
+      table: CarbonTableIdentifier,
+      dimensions: Array[CarbonDimension],
+      hdfsLocation: String,
+      dictfolderPath: String,
+      forPreDefDict: Boolean): DictionaryLoadModel = {
+    val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
+    val isComplexes = new ArrayBuffer[Boolean]
+    for (i <- dimensions.indices) {
+      val dims = getPrimDimensionWithDict(carbonLoadModel, dimensions(i), forPreDefDict)
+      for (j <- dims.indices) {
+        primDimensionsBuffer += dims(j)
+        isComplexes += dimensions(i).isComplex
+      }
+    }
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
+    val primDimensions = primDimensionsBuffer.map { x => x }.toArray
+    val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
+      getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+    val dictFilePaths = dictDetail.dictFilePaths
+    val dictFileExists = dictDetail.dictFileExists
+    val columnIdentifier = dictDetail.columnIdentifiers
+    val hdfsTempLocation = CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
+    val lockType = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
+    val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
+    // load high cardinality identify configure
+    val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
+    val highCardThreshold = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
+    val rowCountPercentage = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
+
+    val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+    // get load count
+    if (null == carbonLoadModel.getLoadMetadataDetails) {
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
+    }
+    new DictionaryLoadModel(table,
+      dimensions,
+      hdfsLocation,
+      dictfolderPath,
+      dictFilePaths,
+      dictFileExists,
+      isComplexes.toArray,
+      primDimensions,
+      carbonLoadModel.getDelimiters,
+      highCardIdentifyEnable,
+      highCardThreshold,
+      rowCountPercentage,
+      columnIdentifier,
+      carbonLoadModel.getLoadMetadataDetails.size() == 0,
+      hdfsTempLocation,
+      lockType,
+      zookeeperUrl,
+      serializationNullFormat)
+  }
+
+  /**
+   * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
+   *
+   * @param sqlContext      SQLContext
+   * @param carbonLoadModel carbon data load model
+   */
+  def loadDataFrame(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel): DataFrame = {
+    val df = sqlContext.read
+      .format("com.databricks.spark.csv.newapi")
+      .option("header", {
+        if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+          "true"
+        } else {
+          "false"
+        }
+      })
+      .option("delimiter", {
+        if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+          "" + DEFAULT_SEPARATOR
+        } else {
+          carbonLoadModel.getCsvDelimiter
+        }
+      })
+      .option("parserLib", "univocity")
+      .option("escape", carbonLoadModel.getEscapeChar)
+      .option("ignoreLeadingWhiteSpace", "false")
+      .option("ignoreTrailingWhiteSpace", "false")
+      .option("codec", "gzip")
+      .option("quote", {
+        if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
+          "" + DEFAULT_QUOTE_CHARACTER
+        } else {
+          carbonLoadModel.getQuoteChar
+        }
+      })
+      .option("comment", carbonLoadModel.getCommentChar)
+      .load(carbonLoadModel.getFactFilePath)
+    df
+  }
+
+  // Hack for spark2 integration
+  var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, DictionaryLoadModel,
+      Array[CarbonDimension]) => Unit = _
+
+  /**
+   * check whether global dictionary have been generated successfully or not
+   *
+   * @param status checking whether the generating is  successful
+   */
+  private def checkStatus(carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      status: Array[(Int, String, Boolean)]) = {
+    var result = false
+    val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
+    val tableName = model.table.getTableName
+    status.foreach { x =>
+      val columnName = model.primDimensions(x._1).getColName
+      if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
+        result = true
+        LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
+      }
+      if (x._3) {
+        noDictionaryColumns += model.primDimensions(x._1)
+      }
+    }
+    if (noDictionaryColumns.nonEmpty) {
+      updateTableMetadataFunc(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
+    }
+    if (result) {
+      LOGGER.error("generate global dictionary files failed")
+      throw new Exception("Failed to generate global dictionary files")
+    } else {
+      LOGGER.info("generate global dictionary successfully")
+    }
+  }
+
+  /**
+   * get external columns and whose dictionary file path
+   *
+   * @param colDictFilePath external column dict file path
+   * @param table           table identifier
+   * @param dimensions      dimension columns
+   */
+  private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
+      colDictFilePath: String,
+      table: CarbonTableIdentifier,
+      dimensions: Array[CarbonDimension]) = {
+    val colFileMapArray = colDictFilePath.split(",")
+    for (colPathMap <- colFileMapArray) {
+      val colPathMapTrim = colPathMap.trim
+      val colNameWithPath = colPathMapTrim.split(":")
+      if (colNameWithPath.length == 1) {
+        LOGGER.error("the format of external column dictionary should be " +
+                     "columnName:columnPath, please check")
+        throw new DataLoadingException("the format of predefined column dictionary" +
+                                       " should be columnName:columnPath, please check")
+      }
+      setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
+        FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
+    }
+  }
+
+  /**
+   * set pre defined dictionary for dimension
+   *
+   * @param dimensions    all the dimensions
+   * @param table         carbon table identifier
+   * @param colName       user specified  column name for predefined dict
+   * @param colDictPath   column dictionary file path
+   * @param parentDimName parent dimenion for complex type
+   */
+  def setPredefineDict(carbonLoadModel: CarbonLoadModel,
+      dimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier,
+      colName: String,
+      colDictPath: String,
+      parentDimName: String = "") {
+    val middleDimName = colName.split("\\.")(0)
+    val dimParent = parentDimName + {
+      colName match {
+        case "" => colName
+        case _ =>
+          if (parentDimName.isEmpty) middleDimName else "." + middleDimName
+      }
+    }
+    // judge whether the column is exists
+    val preDictDimensionOption = dimensions.filter(
+      _.getColName.equalsIgnoreCase(dimParent))
+    if (preDictDimensionOption.length == 0) {
+      LOGGER.error(s"Column $dimParent is not a key column " +
+                   s"in ${ table.getDatabaseName }.${ table.getTableName }")
+      throw new DataLoadingException(s"Column $dimParent is not a key column. " +
+                                     s"Only key column can be part of dictionary " +
+                                     s"and used in COLUMNDICT option.")
+    }
+    val preDictDimension = preDictDimensionOption(0)
+    if (preDictDimension.isComplex) {
+      val children = preDictDimension.getListOfChildDimensions.asScala.toArray
+      // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val
+      val currentColName = {
+        preDictDimension.getDataType match {
+          case DataType.ARRAY =>
+            if (children(0).isComplex) {
+              "val." + colName.substring(middleDimName.length + 1)
+            } else {
+              "val"
+            }
+          case _ => colName.substring(middleDimName.length + 1)
+        }
+      }
+      setPredefineDict(carbonLoadModel, children, table, currentColName,
+        colDictPath, dimParent)
+    } else {
+      carbonLoadModel.setPredefDictMap(preDictDimension, colDictPath)
+    }
+  }
+
+  /**
+   * use external dimension column to generate global dictionary
+   *
+   * @param colDictFilePath external column dict file path
+   * @param table           table identifier
+   * @param dimensions      dimension column
+   * @param carbonLoadModel carbon load model
+   * @param sqlContext      spark sql context
+   * @param hdfsLocation    store location on hdfs
+   * @param dictFolderPath  generated global dict file path
+   */
+  private def generatePredefinedColDictionary(colDictFilePath: String,
+      table: CarbonTableIdentifier,
+      dimensions: Array[CarbonDimension],
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      hdfsLocation: String,
+      dictFolderPath: String) = {
+    // set pre defined dictionary column
+    setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
+    val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
+      hdfsLocation, dictFolderPath, forPreDefDict = true)
+    // new RDD to achieve distributed column dict generation
+    val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
+      sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
+      .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
+    val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
+    // check result status
+    checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
+  }
+
+  /* generate Dimension Parsers
+   *
+   * @param model
+   * @param distinctValuesList
+   * @return dimensionParsers
+   */
+  def createDimensionParsers(model: DictionaryLoadModel,
+      distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = {
+    // local combine set
+    val dimNum = model.dimensions.length
+    val primDimNum = model.primDimensions.length
+    val columnValues = new Array[HashSet[String]](primDimNum)
+    val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
+    for (i <- 0 until primDimNum) {
+      columnValues(i) = new HashSet[String]
+      distinctValuesList += ((i, columnValues(i)))
+      mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
+    }
+    val dimensionParsers = new Array[GenericParser](dimNum)
+    for (j <- 0 until dimNum) {
+      dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
+        Some(model.dimensions(j)),
+        GlobalDictionaryUtil.createDataFormat(model.delimiters),
+        mapColumnValuesWithId).get
+    }
+    dimensionParsers
+  }
+
+  /**
+   * parse records in dictionary file and validate record
+   *
+   * @param x
+   * @param accum
+   * @param csvFileColumns
+   */
+  private def parseRecord(x: String, accum: Accumulator[Int],
+      csvFileColumns: Array[String]): (String, String) = {
+    val tokens = x.split("" + DEFAULT_SEPARATOR)
+    var columnName: String = ""
+    var value: String = ""
+    // such as "," , "", throw ex
+    if (tokens.isEmpty) {
+      LOGGER.error("Read a bad dictionary record: " + x)
+      accum += 1
+    } else if (tokens.size == 1) {
+      // such as "1", "jone", throw ex
+      if (!x.contains(",")) {
+        accum += 1
+      } else {
+        try {
+          columnName = csvFileColumns(tokens(0).toInt)
+        } catch {
+          case ex: Exception =>
+            LOGGER.error("Read a bad dictionary record: " + x)
+            accum += 1
+        }
+      }
+    } else {
+      try {
+        columnName = csvFileColumns(tokens(0).toInt)
+        value = tokens(1)
+      } catch {
+        case ex: Exception =>
+          LOGGER.error("Read a bad dictionary record: " + x)
+          accum += 1
+      }
+    }
+    (columnName, value)
+  }
+
+  /**
+   * read local dictionary and prune column
+   *
+   * @param sqlContext
+   * @param csvFileColumns
+   * @param requireColumns
+   * @param allDictionaryPath
+   * @return allDictionaryRdd
+   */
+  private def readAllDictionaryFiles(sqlContext: SQLContext,
+      csvFileColumns: Array[String],
+      requireColumns: Array[String],
+      allDictionaryPath: String,
+      accumulator: Accumulator[Int]) = {
+    var allDictionaryRdd: RDD[(String, Iterable[String])] = null
+    try {
+      // read local dictionary file, and spilt (columnIndex, columnValue)
+      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
+        .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
+
+      // group by column index, and filter required columns
+      val requireColumnsList = requireColumns.toList
+      allDictionaryRdd = basicRdd
+        .groupByKey()
+        .filter(x => requireColumnsList.contains(x._1))
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("Read dictionary files failed. Caused by: " + ex.getMessage)
+        throw ex
+    }
+    allDictionaryRdd
+  }
+
+  /**
+   * validate local dictionary files
+   *
+   * @param allDictionaryPath
+   * @return (isNonempty, isDirectory)
+   */
+  private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = {
+    val fileType = FileFactory.getFileType(allDictionaryPath)
+    val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType)
+    // filepath regex, look like "/path/*.dictionary"
+    if (filePath.getName.startsWith("*")) {
+      val dictExt = filePath.getName.substring(1)
+      if (filePath.getParentFile.exists()) {
+        val listFiles = filePath.getParentFile.listFiles()
+        if (listFiles.exists(file =>
+          file.getName.endsWith(dictExt) && file.getSize > 0)) {
+          true
+        } else {
+          LOGGER.warn("No dictionary files found or empty dictionary files! " +
+                      "Won't generate new dictionary.")
+          false
+        }
+      } else {
+        throw new FileNotFoundException(
+          "The given dictionary file path is not found!")
+      }
+    } else {
+      if (filePath.exists()) {
+        if (filePath.getSize > 0) {
+          true
+        } else {
+          LOGGER.warn("No dictionary files found or empty dictionary files! " +
+                      "Won't generate new dictionary.")
+          false
+        }
+      } else {
+        throw new FileNotFoundException(
+          "The given dictionary file path is not found!")
+      }
+    }
+  }
+
+  /**
+   * get file headers from fact file
+   *
+   * @param carbonLoadModel
+   * @return headers
+   */
+  private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
+    var headers: Array[String] = null
+    val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
+    val readLine = CarbonUtil.readHeader(factFile)
+
+    if (null != readLine) {
+      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+        "" + DEFAULT_SEPARATOR
+      } else {
+        carbonLoadModel.getCsvDelimiter
+      }
+      headers = readLine.toLowerCase().split(delimiter)
+    } else {
+      LOGGER.error("Not found file header! Please set fileheader")
+      throw new IOException("Failed to get file header")
+    }
+    headers
+  }
+
+  /**
+   * generate global dictionary with SQLContext and CarbonLoadModel
+   *
+   * @param sqlContext      sql context
+   * @param carbonLoadModel carbon load model
+   */
+  def generateGlobalDictionary(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      dataFrame: Option[DataFrame] = None): Unit = {
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      // create dictionary folder if not exists
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+      val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
+      // columns which need to generate global dictionary file
+      val dimensions = carbonTable.getDimensionByTableName(
+        carbonTable.getFactTableName).asScala.toArray
+      // generate global dict from pre defined column dict file
+      carbonLoadModel.initPredefDictMap()
+
+      val allDictionaryPath = carbonLoadModel.getAllDictPath
+      if (StringUtils.isEmpty(allDictionaryPath)) {
+        LOGGER.info("Generate global dictionary from source data files!")
+        // load data by using dataSource com.databricks.spark.csv
+        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
+        var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+          df.columns
+        } else {
+          carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
+        }
+        headers = headers.map(headerName => headerName.trim)
+        val colDictFilePath = carbonLoadModel.getColDictFilePath
+        if (colDictFilePath != null) {
+          // generate predefined dictionary
+          generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
+        }
+        if (headers.length > df.columns.length) {
+          val msg = "The number of columns in the file header do not match the " +
+                    "number of columns in the data file; Either delimiter " +
+                    "or fileheader provided is not correct"
+          LOGGER.error(msg)
+          throw new DataLoadingException(msg)
+        }
+        // use fact file to generate global dict
+        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
+          headers, df.columns)
+        if (requireDimension.nonEmpty) {
+          // select column to push down pruning
+          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
+          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+            requireDimension, storePath, dictfolderPath, false)
+          // combine distinct value in a block and partition by column
+          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
+            .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+          // generate global dictionary files
+          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+          // check result status
+          checkStatus(carbonLoadModel, sqlContext, model, statusList)
+        } else {
+          LOGGER.info("No column found for generating global dictionary in source data files")
+        }
+        // generate global dict from dimension file
+        if (carbonLoadModel.getDimFolderPath != null) {
+          val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
+          for (fileMap <- fileMapArray) {
+            val dimTableName = fileMap.split(":")(0)
+            var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
+            val (requireDimensionForDim, requireColumnNamesForDim) =
+              pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
+            if (requireDimensionForDim.length >= 1) {
+              dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
+                requireColumnNamesForDim.tail: _*)
+              val modelforDim = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+                requireDimensionForDim, storePath, dictfolderPath, false)
+              val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
+                dimDataframe.rdd, modelforDim)
+                .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
+              val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
+                inputRDDforDim, modelforDim).collect()
+              checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
+            } else {
+              LOGGER.info(s"No columns in dimension table $dimTableName " +
+                          "to generate global dictionary")
+            }
+          }
+        }
+      } else {
+        LOGGER.info("Generate global dictionary from dictionary files!")
+        val isNonempty = validateAllDictionaryPath(allDictionaryPath)
+        if (isNonempty) {
+          var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+            getHeaderFormFactFile(carbonLoadModel)
+          } else {
+            carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
+          }
+          headers = headers.map(headerName => headerName.trim)
+          // prune columns according to the CSV file header, dimension columns
+          val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
+          if (requireDimension.nonEmpty) {
+            val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+              requireDimension, storePath, dictfolderPath, false)
+            // check if dictionary files contains bad record
+            val accumulator = sqlContext.sparkContext.accumulator(0)
+            // read local dictionary file, and group by key
+            val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
+              requireColumnNames, allDictionaryPath, accumulator)
+            // read exist dictionary and combine
+            val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
+              .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+            // generate global dictionary files
+            val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+            // check result status
+            checkStatus(carbonLoadModel, sqlContext, model, statusList)
+            // if the dictionary contains wrong format record, throw ex
+            if (accumulator.value > 0) {
+              throw new DataLoadingException("Data Loading failure, dictionary values are " +
+                                             "not in correct format!")
+            }
+          } else {
+            LOGGER.info("have no column need to generate global dictionary")
+          }
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "generate global dictionary failed")
+        throw ex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
new file mode 100644
index 0000000..79c0cc8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+
+/**
+ * Implicit functions for [TableIdentifier]
+ */
+object CarbonTableIdentifierImplicit {
+  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+
+  implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+    tableIdentifier match {
+      case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
+      case Seq(tableName) => TableIdentifier(tableName, None)
+      case _ => throw new IllegalArgumentException("invalid table identifier: " + tableIdentifier)
+    }
+  }
+
+  implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = {
+    tableIdentifier.database match {
+      case Some(dbName) => Seq(dbName, tableIdentifier.table)
+      case _ => Seq(tableIdentifier.table)
+    }
+  }
+}