You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:47 UTC
[09/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..af349a8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet}
+import scala.collection.mutable
+
+import org.apache.spark.Partition
+import org.apache.spark.scheduler.TaskLocation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * DataLoadPartitionCoalescer
+ * Repartition the partitions of rdd to few partitions, one partition per node.
+ * exmaple:
+ * blk_hst host1 host2 host3 host4 host5
+ * block1 host1 host2 host3
+ * block2 host2 host4 host5
+ * block3 host3 host4 host5
+ * block4 host1 host2 host4
+ * block5 host1 host3 host4
+ * block6 host1 host2 host5
+ * -------------------------------------------------------
+ * 1. sort host by number of blocks
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block3 block6
+ * host1: block1 block4 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 2. sort blocks of each host1
+ * new partitions are before old partitions
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block6+block3
+ * host1: block4+block1 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 3. assign blocks to host
+ * -------------------------------------------------------
+ * step1: host3 choose block1, remove from host1, host2
+ * step2: host5 choose block2, remove from host2, host4
+ * step3: host1 choose block4, .....
+ * -------------------------------------------------------
+ * result:
+ * host3: block1 block5
+ * host5: block2
+ * host1: block4
+ * host2: block6
+ * host4: block3
+ */
+class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val prevPartitions = prev.partitions
+ var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
+ // host => partition id list
+ val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
+ // partition id => host list
+ val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
+ val noLocalityPartitions = new ArrayBuffer[Int]
+ var noLocality = true
+ /**
+ * assign a task location for a partition
+ */
+ private def getLocation(index: Int): Option[String] = {
+ if (index < nodeList.length) {
+ Some(nodeList(index))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * collect partitions to each node
+ */
+ private def groupByNode(): Unit = {
+ // initialize hostMapPartitionIds
+ nodeList.foreach { node =>
+ val map = new LinkedHashSet[Int]
+ hostMapPartitionIds.put(node, map)
+ }
+ // collect partitions for each node
+ val tmpNoLocalityPartitions = new ArrayBuffer[Int]
+ prevPartitions.foreach { p =>
+ val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
+ if (locs.isEmpty) {
+ // if a partition has no location, add to noLocalityPartitions
+ tmpNoLocalityPartitions += p.index
+ } else {
+ // add partion to hostMapPartitionIds and partitionIdMapHosts
+ locs.foreach { loc =>
+ val host = loc.host
+ hostMapPartitionIds.get(host) match {
+ // if the location of the partition is not in node list,
+ // will add this partition to noLocalityPartitions
+ case None => tmpNoLocalityPartitions += p.index
+ case Some(ids) =>
+ noLocality = false
+ ids += p.index
+ partitionIdMapHosts.get(p.index) match {
+ case None =>
+ val hosts = new ArrayBuffer[String]
+ hosts += host
+ partitionIdMapHosts.put(p.index, hosts)
+ case Some(hosts) =>
+ hosts += host
+ }
+ }
+ }
+ }
+ }
+
+ // remove locality partition
+ tmpNoLocalityPartitions.distinct.foreach {index =>
+ partitionIdMapHosts.get(index) match {
+ case None => noLocalityPartitions += index
+ case Some(_) =>
+ }
+ }
+ }
+
+ /**
+ * sort host and partitions
+ */
+ private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
+ val oldPartitionIdSet = new HashSet[Int]
+ // sort host by number of partitions
+ hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
+ // order: newPartitionIds + oldPartitionIds
+ val sortedPartitionIdSet = new LinkedHashSet[Int]
+ var newPartitionIds = new ArrayBuffer[Int]
+ var oldPartitionIds = new ArrayBuffer[Int]
+ loc._2.foreach { p =>
+ if (oldPartitionIdSet.contains(p)) {
+ oldPartitionIds += p
+ } else {
+ newPartitionIds += p
+ oldPartitionIdSet.add(p)
+ }
+ }
+ // sort and add new partitions
+ newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+ // sort and add old partitions
+ oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+ // update hostMapPartitionIds
+ hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
+ (loc._1, sortedPartitionIdSet)
+ }.toArray
+ }
+
+ /**
+ * assign locality partition to each host
+ */
+ private def assignPartitonNodeLocality(
+ noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
+ val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
+ for (i <- 0 until localityResult.length) {
+ localityResult(i) = new ArrayBuffer[Int]
+ }
+ val noEmptyHostSet = new HashSet[String]
+ noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
+
+ var hostIndex = 0
+ while (noEmptyHostSet.nonEmpty) {
+ val hostEntry = noEmptyHosts(hostIndex)
+ if (noEmptyHostSet.contains(hostEntry._1)) {
+ if (hostEntry._2.nonEmpty) {
+ var partitionId = hostEntry._2.iterator.next
+ localityResult(hostIndex) += partitionId
+ // remove from sortedParts
+ partitionIdMapHosts.get(partitionId) match {
+ case Some(locs) =>
+ locs.foreach { loc =>
+ hostMapPartitionIds.get(loc) match {
+ case Some(parts) =>
+ parts.remove(partitionId)
+ }
+ }
+ }
+ } else {
+ noEmptyHostSet.remove(hostEntry._1)
+ }
+ }
+
+ hostIndex = hostIndex + 1
+ if (hostIndex == noEmptyHosts.length) {
+ hostIndex = 0
+ }
+ }
+ localityResult
+ }
+
+ /**
+ * assign no locality partitions to each host
+ */
+ private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
+ noEmptyHosts: mutable.Buffer[String],
+ localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
+ val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
+ LOGGER.info(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
+ val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
+ for (i <- 0 until noLocalityResult.length) {
+ noLocalityResult(i) = new ArrayBuffer[Int]
+ }
+ var noLocalityPartitionIndex = 0
+ if (noLocalityPartitions.nonEmpty) {
+ if (emptyHosts.nonEmpty) {
+ // at first, assign avg number to empty node
+ for (i <- 0 until avgNumber) {
+ noLocalityResult.foreach { partitionIds =>
+ if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+ partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
+ noLocalityPartitionIndex = noLocalityPartitionIndex + 1
+ }
+ }
+ }
+ }
+ // still have no locality partitions
+ // assign to all hosts
+ if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+ var partIndex = 0
+ for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
+ if (partIndex < localityResult.length) {
+ localityResult(partIndex) += noLocalityPartitions(i)
+ } else {
+ noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
+ }
+ partIndex = partIndex + 1
+ if (partIndex == localityResult.length + noLocalityResult.length) {
+ partIndex = 0
+ }
+ }
+ }
+ }
+ noLocalityResult
+ }
+
+ /**
+ * no locality repartition
+ */
+ private def repartitionNoLocality(): Array[Partition] = {
+ // no locality repartition
+ LOGGER.info("no locality partition")
+ val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
+ for (i <- 0 until numOfParts) {
+ prevPartIndexs(i) = new ArrayBuffer[Int]
+ }
+ for (i <- 0 until prevPartitions.length) {
+ prevPartIndexs(i % numOfParts) += prevPartitions(i).index
+ }
+ prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
+ new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
+ }
+ }
+
+ private def repartitionLocality(): Array[Partition] = {
+ LOGGER.info("locality partition")
+ val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
+ // empty host seq
+ val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
+ // non empty host array
+ var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
+
+ // 1. do locality repartition
+ // sort host and partitions
+ tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
+ // assign locality partition to non empty hosts
+ val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
+ // collect non empty hosts and empty hosts
+ val noEmptyHosts = mutable.Buffer[String]()
+ val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
+ for(index <- 0 until templocalityResult.size) {
+ if (templocalityResult(index).isEmpty) {
+ emptyHosts += tempNoEmptyHosts(index)._1
+ } else {
+ noEmptyHosts += tempNoEmptyHosts(index)._1
+ localityResult += templocalityResult(index)
+ }
+ }
+ // 2. do no locality repartition
+ // assign no locality partitions to all hosts
+ val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
+
+ // 3. generate CoalescedRDDPartition
+ (0 until localityResult.length + noLocalityResult.length).map { index =>
+ val ids = if (index < localityResult.length) {
+ localityResult(index).toArray
+ } else {
+ noLocalityResult(index - localityResult.length).toArray
+ }
+ val loc = if (index < localityResult.length) {
+ Some(noEmptyHosts(index))
+ } else {
+ Some(emptyHosts(index - localityResult.length))
+ }
+ LOGGER.info(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
+ new CoalescedRDDPartition(index, prev, ids, loc)
+ }.filter(_.parentsIndices.nonEmpty).toArray
+
+ }
+
+ def run(): Array[Partition] = {
+ // 1. group partitions by node
+ groupByNode()
+ LOGGER.info(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
+ val partitions = if (noLocality) {
+ // 2.A no locality partition
+ repartitionNoLocality()
+ } else {
+ // 2.B locality partition
+ repartitionLocality()
+ }
+ DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
+ partitions
+ }
+}
+
+object DataLoadPartitionCoalescer {
+ def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
+ prev.context.getPreferredLocs(prev, p.index)
+ }
+
+ def getParentsIndices(p: Partition): Array[Int] = {
+ p.asInstanceOf[CoalescedRDDPartition].parentsIndices
+ }
+
+ def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+ val prevPartIds = new ArrayBuffer[Int]
+ parts.foreach{ p =>
+ prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
+ }
+ // all partitions must be arranged once.
+ assert(prevPartIds.size == prevParts.size)
+ val prevPartIdsMap = prevPartIds.map{ id =>
+ (id, id)
+ }.toMap
+ prevParts.foreach{ p =>
+ prevPartIdsMap.get(p.index) match {
+ case None => assert(false, "partition " + p.index + " not found")
+ case Some(_) =>
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
new file mode 100644
index 0000000..e23b58d
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.tasks
+
+import java.io.IOException
+
+import scala.collection.mutable
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.spark.rdd.DictionaryLoadModel
+
+/**
+ *
+ * @param valuesBuffer
+ * @param dictionary
+ * @param model
+ * @param columnIndex
+ * @param writer
+ */
+class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
+ dictionary: Dictionary,
+ model: DictionaryLoadModel, columnIndex: Int,
+ var writer: CarbonDictionaryWriter = null) {
+
+ /**
+ * execute the task
+ *
+ * @return distinctValueList and time taken to write
+ */
+ def execute(): java.util.List[String] = {
+ val values = valuesBuffer.toArray
+ java.util.Arrays.sort(values, Ordering[String])
+ val dictService = CarbonCommonFactory.getDictionaryService
+ writer = dictService.getDictionaryWriter(
+ model.table,
+ model.columnIdentifier(columnIndex),
+ model.hdfsLocation)
+ val distinctValues: java.util.List[String] = new java.util.ArrayList()
+
+ try {
+ if (!model.dictFileExists(columnIndex)) {
+ writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+ distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+ }
+
+ if (values.length >= 1) {
+ if (model.dictFileExists(columnIndex)) {
+ for (value <- values) {
+ val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
+ model.primDimensions(columnIndex))
+ if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
+ CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+ writer.write(parsedValue)
+ distinctValues.add(parsedValue)
+ }
+ }
+
+ } else {
+ for (value <- values) {
+ val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
+ model.primDimensions(columnIndex))
+ if (null != parsedValue) {
+ writer.write(parsedValue)
+ distinctValues.add(parsedValue)
+ }
+ }
+ }
+ }
+ } catch {
+ case ex: IOException =>
+ throw ex
+ } finally {
+ if (null != writer) {
+ writer.close()
+ }
+ }
+ distinctValues
+ }
+
+ /**
+ * update dictionary metadata
+ */
+ def updateMetaData() {
+ if (null != writer) {
+ writer.commit()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
new file mode 100644
index 0000000..d552331
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.tasks
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.spark.rdd.DictionaryLoadModel
+
+/**
+ * This task writes sort index file
+ *
+ * @param model
+ * @param index
+ * @param dictionary
+ * @param distinctValues
+ * @param carbonDictionarySortIndexWriter
+ */
+class SortIndexWriterTask(model: DictionaryLoadModel,
+ index: Int,
+ dictionary: Dictionary,
+ distinctValues: java.util.List[String],
+ var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) {
+ def execute() {
+ try {
+ if (distinctValues.size() > 0) {
+ val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+ val dictService = CarbonCommonFactory.getDictionaryService
+ val dictionarySortInfo: CarbonDictionarySortInfo =
+ preparator.getDictionarySortInfo(distinctValues, dictionary,
+ model.primDimensions(index).getDataType)
+ carbonDictionarySortIndexWriter =
+ dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
+ model.hdfsLocation)
+ carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+ carbonDictionarySortIndexWriter
+ .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
+ }
+ } finally {
+ if (null != carbonDictionarySortIndexWriter) {
+ carbonDictionarySortIndexWriter.close()
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
new file mode 100644
index 0000000..dc63186
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonDataType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonScalaUtil {
+ def convertSparkToCarbonDataType(
+ dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataType.STRING
+ case ShortType => CarbonDataType.SHORT
+ case IntegerType => CarbonDataType.INT
+ case LongType => CarbonDataType.LONG
+ case DoubleType => CarbonDataType.DOUBLE
+ case FloatType => CarbonDataType.FLOAT
+ case DateType => CarbonDataType.DATE
+ case BooleanType => CarbonDataType.BOOLEAN
+ case TimestampType => CarbonDataType.TIMESTAMP
+ case ArrayType(_, _) => CarbonDataType.ARRAY
+ case StructType(_) => CarbonDataType.STRUCT
+ case NullType => CarbonDataType.NULL
+ case _ => CarbonDataType.DECIMAL
+ }
+ }
+
+ def convertSparkToCarbonSchemaDataType(dataType: String): String = {
+ dataType match {
+ case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING
+ case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER
+ case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER
+ case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT
+ case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC
+ case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC
+ case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC
+ case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC
+ case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING
+ case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING
+ case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP
+ case anyType => anyType
+ }
+ }
+
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ dataType match {
+ case CarbonDataType.STRING => StringType
+ case CarbonDataType.SHORT => ShortType
+ case CarbonDataType.INT => IntegerType
+ case CarbonDataType.LONG => LongType
+ case CarbonDataType.DOUBLE => DoubleType
+ case CarbonDataType.BOOLEAN => BooleanType
+ case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
+ case CarbonDataType.TIMESTAMP => TimestampType
+ }
+ }
+
+ def updateDataType(
+ currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
+ currentDataType match {
+ case decimal: DecimalType =>
+ val scale = currentDataType.asInstanceOf[DecimalType].scale
+ DecimalType(DecimalType.MAX_PRECISION, scale)
+ case _ =>
+ currentDataType
+ }
+ }
+
+ def getKettleHome(sqlContext: SQLContext): String = {
+ var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
+ if (null == kettleHomePath) {
+ kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
+ }
+ if (null == kettleHomePath) {
+ val carbonHome = System.getenv("CARBON_HOME")
+ if (null != carbonHome) {
+ kettleHomePath = carbonHome + "/processing/carbonplugins"
+ }
+ }
+ if (kettleHomePath != null) {
+ val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
+ // get spark master, if local, need to correct the kettle home
+ // e.g: --master local, the executor running in local machine
+ if (sparkMaster.startsWith("local")) {
+ val kettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+ val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType)
+ // check if carbon.kettle.home path is exists
+ if (!kettleHomeFile.exists()) {
+ // get the path of this class
+ // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-
+ // xxx.jar!/org/carbondata/spark/rdd/
+ var jarFilePath = this.getClass.getResource("").getPath
+ val endIndex = jarFilePath.indexOf(".jar!") + 4
+ // get the jar file path
+ // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar
+ jarFilePath = jarFilePath.substring(0, endIndex)
+ val jarFileType = FileFactory.getFileType(jarFilePath)
+ val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType)
+ // get the parent folder of the jar file
+ // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib
+ val carbonLibPath = jarFile.getParentFile.getPath
+ // find the kettle home under the previous folder
+ // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins
+ kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME
+ val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ logger.error(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath")
+ val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+ val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType)
+ // check if the found kettle home exists
+ if (!newKettleHomeFile.exists()) {
+ sys.error("Kettle home not found. Failed to reset carbon.kettle.home")
+ }
+ }
+ }
+ } else {
+ sys.error("carbon.kettle.home is not set")
+ }
+ kettleHomePath
+ }
+
+ def getString(value: Any,
+ serializationNullFormat: String,
+ delimiterLevel1: String,
+ delimiterLevel2: String,
+ format: SimpleDateFormat,
+ level: Int = 1): String = {
+ if (value == null) {
+ serializationNullFormat
+ } else {
+ value match {
+ case s: String => s
+ case d: java.math.BigDecimal => d.toPlainString
+ case i: java.lang.Integer => i.toString
+ case d: java.lang.Double => d.toString
+ case t: java.sql.Timestamp => format format t
+ case d: java.sql.Date => format format d
+ case b: java.lang.Boolean => b.toString
+ case s: java.lang.Short => s.toString
+ case f: java.lang.Float => f.toString
+ case bs: Array[Byte] => new String(bs)
+ case s: scala.collection.Seq[Any] =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ s.foreach { x =>
+ builder.append(getString(x, serializationNullFormat, delimiterLevel1,
+ delimiterLevel2, format, level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case m: scala.collection.Map[Any, Any] =>
+ throw new Exception("Unsupported data type: Map")
+ case r: org.apache.spark.sql.Row =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ for (i <- 0 until r.length) {
+ builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
+ delimiterLevel2, format, level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case other => other.toString
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
new file mode 100644
index 0000000..1c9d774
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.execution.command.ColumnProperty
+import org.apache.spark.sql.execution.command.Field
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+object CommonUtil {
+ def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
+ msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
+ val colGrpCols = colGroup.split(',').map(_.trim)
+ colGrpCols.foreach { x =>
+ // if column is no dictionary
+ if (noDictionaryDims.contains(x)) {
+ throw new MalformedCarbonCommandException(
+ "Column group is not supported for no dictionary columns:" + x)
+ } else if (msrs.exists(msr => msr.column.equals(x))) {
+ // if column is measure
+ throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
+ } else if (foundIndExistingColGrp(x)) {
+ throw new MalformedCarbonCommandException("Column is available in other column group:" + x)
+ } else if (isComplex(x, dims)) {
+ throw new MalformedCarbonCommandException(
+ "Column group doesn't support Complex column:" + x)
+ } else if (isTimeStampColumn(x, dims)) {
+ throw new MalformedCarbonCommandException(
+ "Column group doesn't support Timestamp datatype:" + x)
+ }// if invalid column is
+ else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
+ // present
+ throw new MalformedCarbonCommandException(
+ "column in column group is not a valid column: " + x
+ )
+ }
+ }
+ // check if given column is present in other groups
+ def foundIndExistingColGrp(colName: String): Boolean = {
+ retrievedColGrps.foreach { colGrp =>
+ if (colGrp.split(",").contains(colName)) {
+ return true
+ }
+ }
+ false
+ }
+
+ }
+
+
+ def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
+ dims.foreach { dim =>
+ if (dim.column.equalsIgnoreCase(colName)) {
+ if (dim.dataType.isDefined && null != dim.dataType.get &&
+ "timestamp".equalsIgnoreCase(dim.dataType.get)) {
+ return true
+ }
+ }
+ }
+ false
+ }
+
+ def isComplex(colName: String, dims: Seq[Field]): Boolean = {
+ dims.foreach { x =>
+ if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) {
+ val children = x.children.get
+ if (x.column.equals(colName)) {
+ return true
+ } else {
+ children.foreach { child =>
+ val fieldName = x.column + "." + child.column
+ if (fieldName.equalsIgnoreCase(colName)) {
+ return true
+ }
+ }
+ }
+ }
+ }
+ false
+ }
+
+ def getColumnProperties(column: String,
+ tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = {
+ val fieldProps = new util.ArrayList[ColumnProperty]()
+ val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+ tableProperties.foreach {
+ case (key, value) =>
+ if (key.startsWith(columnPropertiesStartKey)) {
+ fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(),
+ key.length()), value))
+ }
+ }
+ if (fieldProps.isEmpty) {
+ None
+ } else {
+ Some(fieldProps)
+ }
+ }
+
+ def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = {
+ val itr = tableProperties.keys
+ var isValid: Boolean = true
+ tableProperties.foreach {
+ case (key, value) =>
+ if (!validateFields(key, fields)) {
+ isValid = false
+ throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }")
+ }
+ }
+ isValid
+ }
+
+ def validateFields(key: String, fields: Seq[Field]): Boolean = {
+ var isValid: Boolean = false
+ fields.foreach { field =>
+ if (field.children.isDefined && field.children.get != null) {
+ field.children.foreach(fields => {
+ fields.foreach(complexfield => {
+ val column = if ("val" == complexfield.column) {
+ field.column
+ } else {
+ field.column + "." + complexfield.column
+ }
+ if (validateColumnProperty(key, column)) {
+ isValid = true
+ }
+ }
+ )
+ }
+ )
+ } else {
+ if (validateColumnProperty(key, field.column)) {
+ isValid = true
+ }
+ }
+
+ }
+ isValid
+ }
+
+ def validateColumnProperty(key: String, column: String): Boolean = {
+ if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) {
+ return true
+ }
+ val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
+ if (key.startsWith(columnPropertyKey)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * @param colGrps
+ * @param dims
+ * @return columns of column groups in schema order
+ */
+ def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = {
+ def sortByIndex(colGrp1: String, colGrp2: String) = {
+ val firstCol1 = colGrp1.split(",")(0)
+ val firstCol2 = colGrp2.split(",")(0)
+ val dimIndex1: Int = getDimIndex(firstCol1, dims)
+ val dimIndex2: Int = getDimIndex(firstCol2, dims)
+ dimIndex1 < dimIndex2
+ }
+ val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex)
+ sortedColGroups
+ }
+
+ /**
+ * @param colName
+ * @param dims
+ * @return return index for given column in dims
+ */
+ def getDimIndex(colName: String, dims: Seq[Field]): Int = {
+ var index: Int = -1
+ dims.zipWithIndex.foreach { h =>
+ if (h._1.column.equalsIgnoreCase(colName)) {
+ index = h._2.toInt
+ }
+ }
+ index
+ }
+
+ /**
+ * This method will validate the table block size specified by the user
+ *
+ * @param tableProperties
+ */
+ def validateTableBlockSize(tableProperties: Map[String, String]): Unit = {
+ var tableBlockSize: Integer = 0
+ if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
+ val blockSizeStr: String =
+ parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
+ try {
+ tableBlockSize = Integer.parseInt(blockSizeStr)
+ } catch {
+ case e: NumberFormatException =>
+ throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
+ s"$blockSizeStr, only int value from 1 MB to " +
+ s"2048 MB is supported.")
+ }
+ if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL ||
+ tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) {
+ throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
+ s"$blockSizeStr, only int value from 1 MB to " +
+ s"2048 MB is supported.")
+ }
+ tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr)
+ }
+ }
+
+ /**
+ * This method will parse the configure string from 'XX MB/M' to 'XX'
+ *
+ * @param propertyValueString
+ */
+ def parsePropertyValueStringInMB(propertyValueString: String): String = {
+ var parsedPropertyValueString: String = propertyValueString
+ if (propertyValueString.trim.toLowerCase.endsWith("mb")) {
+ parsedPropertyValueString = propertyValueString.trim.toLowerCase
+ .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("mb")).trim
+ }
+ if (propertyValueString.trim.toLowerCase.endsWith("m")) {
+ parsedPropertyValueString = propertyValueString.trim.toLowerCase
+ .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("m")).trim
+ }
+ parsedPropertyValueString
+ }
+
+ def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
+ val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
+ val details = SegmentStatusManager.readLoadMetadata(metadataPath)
+ model.setLoadMetadataDetails(details.toList.asJava)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
new file mode 100644
index 0000000..5ec96df
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+
+object DataTypeConverterUtil {
+ def convertToCarbonType(dataType: String): DataType = {
+ dataType.toLowerCase match {
+ case "string" => DataType.STRING
+ case "int" => DataType.INT
+ case "integer" => DataType.INT
+ case "tinyint" => DataType.SHORT
+ case "short" => DataType.SHORT
+ case "long" => DataType.LONG
+ case "bigint" => DataType.LONG
+ case "numeric" => DataType.DOUBLE
+ case "double" => DataType.DOUBLE
+ case "decimal" => DataType.DECIMAL
+ case "timestamp" => DataType.TIMESTAMP
+ case "array" => DataType.ARRAY
+ case "struct" => DataType.STRUCT
+ case _ => convertToCarbonTypeForSpark2(dataType)
+ }
+ }
+
+ def convertToCarbonTypeForSpark2(dataType: String): DataType = {
+ dataType.toLowerCase match {
+ case "stringtype" => DataType.STRING
+ case "inttype" => DataType.INT
+ case "integertype" => DataType.INT
+ case "tinyinttype" => DataType.SHORT
+ case "shorttype" => DataType.SHORT
+ case "longtype" => DataType.LONG
+ case "biginttype" => DataType.LONG
+ case "numerictype" => DataType.DOUBLE
+ case "doubletype" => DataType.DOUBLE
+ case "decimaltype" => DataType.DECIMAL
+ case "timestamptype" => DataType.TIMESTAMP
+ case "arraytype" => DataType.ARRAY
+ case "structtype" => DataType.STRUCT
+ case _ => sys.error(s"Unsupported data type: $dataType")
+ }
+ }
+
+ def convertToString(dataType: DataType): String = {
+ dataType match {
+ case DataType.STRING => "string"
+ case DataType.SHORT => "smallint"
+ case DataType.INT => "int"
+ case DataType.LONG => "bigint"
+ case DataType.DOUBLE => "double"
+ case DataType.DECIMAL => "decimal"
+ case DataType.TIMESTAMP => "timestamp"
+ case DataType.ARRAY => "array"
+ case DataType.STRUCT => "struct"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
new file mode 100644
index 0000000..e650bfe
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.io.{FileNotFoundException, IOException}
+import java.nio.charset.Charset
+import java.util.regex.Pattern
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.language.implicitConversions
+import scala.util.control.Breaks.{break, breakable}
+
+import org.apache.commons.lang3.{ArrayUtils, StringUtils}
+import org.apache.spark.Accumulator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.util.FileUtils
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.reader.CarbonDictionaryReader
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.CarbonSparkFactory
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd._
+
+/**
+ * A object which provide a method to generate global dictionary from CSV files.
+ */
+object GlobalDictionaryUtil {
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * The default separator to use if none is supplied to the constructor.
+ */
+ val DEFAULT_SEPARATOR: Char = ','
+ /**
+ * The default quote character to use if none is supplied to the
+ * constructor.
+ */
+ val DEFAULT_QUOTE_CHARACTER: Char = '"'
+
+ /**
+ * find columns which need to generate global dictionary.
+ *
+ * @param dimensions dimension list of schema
+ * @param headers column headers
+ * @param columns column list of csv file
+ */
+ def pruneDimensions(dimensions: Array[CarbonDimension],
+ headers: Array[String],
+ columns: Array[String]): (Array[CarbonDimension], Array[String]) = {
+ val dimensionBuffer = new ArrayBuffer[CarbonDimension]
+ val columnNameBuffer = new ArrayBuffer[String]
+ val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY,
+ Encoding.DIRECT_DICTIONARY))
+ dimensionsWithDict.foreach { dim =>
+ breakable {
+ headers.zipWithIndex.foreach { h =>
+ if (dim.getColName.equalsIgnoreCase(h._1)) {
+ dimensionBuffer += dim
+ columnNameBuffer += columns(h._2)
+ break
+ }
+ }
+ }
+ }
+ (dimensionBuffer.toArray, columnNameBuffer.toArray)
+ }
+
+ /**
+ * use this method to judge whether CarbonDimension use some encoding or not
+ *
+ * @param dimension carbonDimension
+ * @param encoding the coding way of dimension
+ * @param excludeEncoding the coding way to exclude
+ */
+ def hasEncoding(dimension: CarbonDimension,
+ encoding: Encoding,
+ excludeEncoding: Encoding): Boolean = {
+ if (dimension.isComplex()) {
+ val children = dimension.getListOfChildDimensions
+ children.asScala.exists(hasEncoding(_, encoding, excludeEncoding))
+ } else {
+ dimension.hasEncoding(encoding) &&
+ (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))
+ }
+ }
+
+ def gatherDimensionByEncoding(carbonLoadModel: CarbonLoadModel,
+ dimension: CarbonDimension,
+ encoding: Encoding,
+ excludeEncoding: Encoding,
+ dimensionsWithEncoding: ArrayBuffer[CarbonDimension],
+ forPreDefDict: Boolean) {
+ if (dimension.isComplex) {
+ val children = dimension.getListOfChildDimensions.asScala
+ children.foreach { c =>
+ gatherDimensionByEncoding(carbonLoadModel, c, encoding, excludeEncoding,
+ dimensionsWithEncoding, forPreDefDict)
+ }
+ } else {
+ if (dimension.hasEncoding(encoding) &&
+ (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
+ if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
+ (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
+ dimensionsWithEncoding += dimension
+ }
+ }
+ }
+ }
+
+ def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
+ dimension: CarbonDimension,
+ forPreDefDict: Boolean): Array[CarbonDimension] = {
+ val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
+ gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
+ Encoding.DIRECT_DICTIONARY,
+ dimensionsWithDict, forPreDefDict)
+ dimensionsWithDict.toArray
+ }
+
+ /**
+ * invoke CarbonDictionaryWriter to write dictionary to file.
+ *
+ * @param model instance of DictionaryLoadModel
+ * @param columnIndex the index of current column in column list
+ * @param iter distinct value list of dictionary
+ */
+ def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
+ columnIndex: Int,
+ iter: Iterator[String]): Unit = {
+ val dictService = CarbonCommonFactory.getDictionaryService
+ val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
+ model.table,
+ model.columnIdentifier(columnIndex),
+ model.hdfsLocation
+ )
+ try {
+ while (iter.hasNext) {
+ writer.write(iter.next)
+ }
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * read global dictionary from cache
+ */
+ def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = {
+ val dictMap = new HashMap[String, Dictionary]
+ model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m =>
+ val dict = CarbonLoaderUtil.getDictionary(model.table,
+ m._1.getColumnIdentifier, model.hdfsLocation,
+ m._1.getDataType
+ )
+ dictMap.put(m._1.getColumnId, dict)
+ }
+ dictMap
+ }
+
+ /**
+ * invoke CarbonDictionaryReader to read dictionary from files.
+ *
+ * @param model carbon dictionary load model
+ */
+ def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = {
+ val dictMap = new HashMap[String, HashSet[String]]
+ val dictService = CarbonCommonFactory.getDictionaryService
+ for (i <- model.primDimensions.indices) {
+ val set = new HashSet[String]
+ if (model.dictFileExists(i)) {
+ val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table,
+ model.columnIdentifier(i), model.hdfsLocation
+ )
+ val values = reader.read
+ if (values != null) {
+ for (j <- 0 until values.size) {
+ set.add(new String(values.get(j),
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
+ }
+ }
+ }
+ dictMap.put(model.primDimensions(i).getColumnId, set)
+ }
+ dictMap
+ }
+
+ def generateParserForChildrenDimension(dim: CarbonDimension,
+ format: DataFormat,
+ mapColumnValuesWithId:
+ HashMap[String, HashSet[String]],
+ generic: GenericParser): Unit = {
+ val children = dim.getListOfChildDimensions.asScala
+ for (i <- children.indices) {
+ generateParserForDimension(Some(children(i)), format.cloneAndIncreaseIndex,
+ mapColumnValuesWithId) match {
+ case Some(childDim) =>
+ generic.addChild(childDim)
+ case None =>
+ }
+ }
+ }
+
+ def generateParserForDimension(dimension: Option[CarbonDimension],
+ format: DataFormat,
+ mapColumnValuesWithId: HashMap[String, HashSet[String]]): Option[GenericParser] = {
+ dimension match {
+ case None =>
+ None
+ case Some(dim) =>
+ dim.getDataType match {
+ case DataType.ARRAY =>
+ val arrDim = ArrayParser(dim, format)
+ generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim)
+ Some(arrDim)
+ case DataType.STRUCT =>
+ val stuDim = StructParser(dim, format)
+ generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim)
+ Some(stuDim)
+ case _ =>
+ Some(PrimitiveParser(dim, mapColumnValuesWithId.get(dim.getColumnId)))
+ }
+ }
+ }
+
+ def createDataFormat(delimiters: Array[String]): DataFormat = {
+ if (ArrayUtils.isNotEmpty(delimiters)) {
+ val patterns = delimiters.map { d =>
+ Pattern.compile(if (d == null) {
+ ""
+ } else {
+ d
+ })
+ }
+ DataFormat(delimiters, 0, patterns)
+ } else {
+ null
+ }
+ }
+
+ def isHighCardinalityColumn(columnCardinality: Int,
+ rowCount: Long,
+ model: DictionaryLoadModel): Boolean = {
+ (columnCardinality > model.highCardThreshold) &&
+ (rowCount > 0) &&
+ (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
+ }
+
+ /**
+ * create a instance of DictionaryLoadModel
+ *
+ * @param carbonLoadModel carbon load model
+ * @param table CarbonTableIdentifier
+ * @param dimensions column list
+ * @param hdfsLocation store location in HDFS
+ * @param dictfolderPath path of dictionary folder
+ */
+ def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension],
+ hdfsLocation: String,
+ dictfolderPath: String,
+ forPreDefDict: Boolean): DictionaryLoadModel = {
+ val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
+ val isComplexes = new ArrayBuffer[Boolean]
+ for (i <- dimensions.indices) {
+ val dims = getPrimDimensionWithDict(carbonLoadModel, dimensions(i), forPreDefDict)
+ for (j <- dims.indices) {
+ primDimensionsBuffer += dims(j)
+ isComplexes += dimensions(i).isComplex
+ }
+ }
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
+ val primDimensions = primDimensionsBuffer.map { x => x }.toArray
+ val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
+ getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+ val dictFilePaths = dictDetail.dictFilePaths
+ val dictFileExists = dictDetail.dictFileExists
+ val columnIdentifier = dictDetail.columnIdentifiers
+ val hdfsTempLocation = CarbonProperties.getInstance.
+ getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
+ val lockType = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
+ val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
+ // load high cardinality identify configure
+ val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+ CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
+ val highCardThreshold = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+ CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
+ val rowCountPercentage = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+ CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
+
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ // get load count
+ if (null == carbonLoadModel.getLoadMetadataDetails) {
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
+ }
+ new DictionaryLoadModel(table,
+ dimensions,
+ hdfsLocation,
+ dictfolderPath,
+ dictFilePaths,
+ dictFileExists,
+ isComplexes.toArray,
+ primDimensions,
+ carbonLoadModel.getDelimiters,
+ highCardIdentifyEnable,
+ highCardThreshold,
+ rowCountPercentage,
+ columnIdentifier,
+ carbonLoadModel.getLoadMetadataDetails.size() == 0,
+ hdfsTempLocation,
+ lockType,
+ zookeeperUrl,
+ serializationNullFormat)
+ }
+
+ /**
+ * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
+ *
+ * @param sqlContext SQLContext
+ * @param carbonLoadModel carbon data load model
+ */
+ def loadDataFrame(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel): DataFrame = {
+ val df = sqlContext.read
+ .format("com.databricks.spark.csv.newapi")
+ .option("header", {
+ if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+ "true"
+ } else {
+ "false"
+ }
+ })
+ .option("delimiter", {
+ if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+ "" + DEFAULT_SEPARATOR
+ } else {
+ carbonLoadModel.getCsvDelimiter
+ }
+ })
+ .option("parserLib", "univocity")
+ .option("escape", carbonLoadModel.getEscapeChar)
+ .option("ignoreLeadingWhiteSpace", "false")
+ .option("ignoreTrailingWhiteSpace", "false")
+ .option("codec", "gzip")
+ .option("quote", {
+ if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
+ "" + DEFAULT_QUOTE_CHARACTER
+ } else {
+ carbonLoadModel.getQuoteChar
+ }
+ })
+ .option("comment", carbonLoadModel.getCommentChar)
+ .load(carbonLoadModel.getFactFilePath)
+ df
+ }
+
+ // Hack for spark2 integration
+ var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, DictionaryLoadModel,
+ Array[CarbonDimension]) => Unit = _
+
+ /**
+ * check whether global dictionary have been generated successfully or not
+ *
+ * @param status checking whether the generating is successful
+ */
+ private def checkStatus(carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ status: Array[(Int, String, Boolean)]) = {
+ var result = false
+ val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
+ val tableName = model.table.getTableName
+ status.foreach { x =>
+ val columnName = model.primDimensions(x._1).getColName
+ if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
+ result = true
+ LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
+ }
+ if (x._3) {
+ noDictionaryColumns += model.primDimensions(x._1)
+ }
+ }
+ if (noDictionaryColumns.nonEmpty) {
+ updateTableMetadataFunc(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
+ }
+ if (result) {
+ LOGGER.error("generate global dictionary files failed")
+ throw new Exception("Failed to generate global dictionary files")
+ } else {
+ LOGGER.info("generate global dictionary successfully")
+ }
+ }
+
+ /**
+ * get external columns and whose dictionary file path
+ *
+ * @param colDictFilePath external column dict file path
+ * @param table table identifier
+ * @param dimensions dimension columns
+ */
+ private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
+ colDictFilePath: String,
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension]) = {
+ val colFileMapArray = colDictFilePath.split(",")
+ for (colPathMap <- colFileMapArray) {
+ val colPathMapTrim = colPathMap.trim
+ val colNameWithPath = colPathMapTrim.split(":")
+ if (colNameWithPath.length == 1) {
+ LOGGER.error("the format of external column dictionary should be " +
+ "columnName:columnPath, please check")
+ throw new DataLoadingException("the format of predefined column dictionary" +
+ " should be columnName:columnPath, please check")
+ }
+ setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
+ FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
+ }
+ }
+
+ /**
+ * set pre defined dictionary for dimension
+ *
+ * @param dimensions all the dimensions
+ * @param table carbon table identifier
+ * @param colName user specified column name for predefined dict
+ * @param colDictPath column dictionary file path
+ * @param parentDimName parent dimenion for complex type
+ */
+ def setPredefineDict(carbonLoadModel: CarbonLoadModel,
+ dimensions: Array[CarbonDimension],
+ table: CarbonTableIdentifier,
+ colName: String,
+ colDictPath: String,
+ parentDimName: String = "") {
+ val middleDimName = colName.split("\\.")(0)
+ val dimParent = parentDimName + {
+ colName match {
+ case "" => colName
+ case _ =>
+ if (parentDimName.isEmpty) middleDimName else "." + middleDimName
+ }
+ }
+ // judge whether the column is exists
+ val preDictDimensionOption = dimensions.filter(
+ _.getColName.equalsIgnoreCase(dimParent))
+ if (preDictDimensionOption.length == 0) {
+ LOGGER.error(s"Column $dimParent is not a key column " +
+ s"in ${ table.getDatabaseName }.${ table.getTableName }")
+ throw new DataLoadingException(s"Column $dimParent is not a key column. " +
+ s"Only key column can be part of dictionary " +
+ s"and used in COLUMNDICT option.")
+ }
+ val preDictDimension = preDictDimensionOption(0)
+ if (preDictDimension.isComplex) {
+ val children = preDictDimension.getListOfChildDimensions.asScala.toArray
+ // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val
+ val currentColName = {
+ preDictDimension.getDataType match {
+ case DataType.ARRAY =>
+ if (children(0).isComplex) {
+ "val." + colName.substring(middleDimName.length + 1)
+ } else {
+ "val"
+ }
+ case _ => colName.substring(middleDimName.length + 1)
+ }
+ }
+ setPredefineDict(carbonLoadModel, children, table, currentColName,
+ colDictPath, dimParent)
+ } else {
+ carbonLoadModel.setPredefDictMap(preDictDimension, colDictPath)
+ }
+ }
+
+ /**
+ * use external dimension column to generate global dictionary
+ *
+ * @param colDictFilePath external column dict file path
+ * @param table table identifier
+ * @param dimensions dimension column
+ * @param carbonLoadModel carbon load model
+ * @param sqlContext spark sql context
+ * @param hdfsLocation store location on hdfs
+ * @param dictFolderPath generated global dict file path
+ */
+ private def generatePredefinedColDictionary(colDictFilePath: String,
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension],
+ carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ hdfsLocation: String,
+ dictFolderPath: String) = {
+ // set pre defined dictionary column
+ setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
+ val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
+ hdfsLocation, dictFolderPath, forPreDefDict = true)
+ // new RDD to achieve distributed column dict generation
+ val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
+ sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
+ .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
+ // check result status
+ checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
+ }
+
+ /* generate Dimension Parsers
+ *
+ * @param model
+ * @param distinctValuesList
+ * @return dimensionParsers
+ */
+ def createDimensionParsers(model: DictionaryLoadModel,
+ distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = {
+ // local combine set
+ val dimNum = model.dimensions.length
+ val primDimNum = model.primDimensions.length
+ val columnValues = new Array[HashSet[String]](primDimNum)
+ val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
+ for (i <- 0 until primDimNum) {
+ columnValues(i) = new HashSet[String]
+ distinctValuesList += ((i, columnValues(i)))
+ mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
+ }
+ val dimensionParsers = new Array[GenericParser](dimNum)
+ for (j <- 0 until dimNum) {
+ dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
+ Some(model.dimensions(j)),
+ GlobalDictionaryUtil.createDataFormat(model.delimiters),
+ mapColumnValuesWithId).get
+ }
+ dimensionParsers
+ }
+
+ /**
+ * parse records in dictionary file and validate record
+ *
+ * @param x
+ * @param accum
+ * @param csvFileColumns
+ */
+ private def parseRecord(x: String, accum: Accumulator[Int],
+ csvFileColumns: Array[String]): (String, String) = {
+ val tokens = x.split("" + DEFAULT_SEPARATOR)
+ var columnName: String = ""
+ var value: String = ""
+ // such as "," , "", throw ex
+ if (tokens.isEmpty) {
+ LOGGER.error("Read a bad dictionary record: " + x)
+ accum += 1
+ } else if (tokens.size == 1) {
+ // such as "1", "jone", throw ex
+ if (!x.contains(",")) {
+ accum += 1
+ } else {
+ try {
+ columnName = csvFileColumns(tokens(0).toInt)
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Read a bad dictionary record: " + x)
+ accum += 1
+ }
+ }
+ } else {
+ try {
+ columnName = csvFileColumns(tokens(0).toInt)
+ value = tokens(1)
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Read a bad dictionary record: " + x)
+ accum += 1
+ }
+ }
+ (columnName, value)
+ }
+
+ /**
+ * read local dictionary and prune column
+ *
+ * @param sqlContext
+ * @param csvFileColumns
+ * @param requireColumns
+ * @param allDictionaryPath
+ * @return allDictionaryRdd
+ */
+ private def readAllDictionaryFiles(sqlContext: SQLContext,
+ csvFileColumns: Array[String],
+ requireColumns: Array[String],
+ allDictionaryPath: String,
+ accumulator: Accumulator[Int]) = {
+ var allDictionaryRdd: RDD[(String, Iterable[String])] = null
+ try {
+ // read local dictionary file, and spilt (columnIndex, columnValue)
+ val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
+ .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
+
+ // group by column index, and filter required columns
+ val requireColumnsList = requireColumns.toList
+ allDictionaryRdd = basicRdd
+ .groupByKey()
+ .filter(x => requireColumnsList.contains(x._1))
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Read dictionary files failed. Caused by: " + ex.getMessage)
+ throw ex
+ }
+ allDictionaryRdd
+ }
+
+ /**
+ * validate local dictionary files
+ *
+ * @param allDictionaryPath
+ * @return (isNonempty, isDirectory)
+ */
+ private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = {
+ val fileType = FileFactory.getFileType(allDictionaryPath)
+ val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType)
+ // filepath regex, look like "/path/*.dictionary"
+ if (filePath.getName.startsWith("*")) {
+ val dictExt = filePath.getName.substring(1)
+ if (filePath.getParentFile.exists()) {
+ val listFiles = filePath.getParentFile.listFiles()
+ if (listFiles.exists(file =>
+ file.getName.endsWith(dictExt) && file.getSize > 0)) {
+ true
+ } else {
+ LOGGER.warn("No dictionary files found or empty dictionary files! " +
+ "Won't generate new dictionary.")
+ false
+ }
+ } else {
+ throw new FileNotFoundException(
+ "The given dictionary file path is not found!")
+ }
+ } else {
+ if (filePath.exists()) {
+ if (filePath.getSize > 0) {
+ true
+ } else {
+ LOGGER.warn("No dictionary files found or empty dictionary files! " +
+ "Won't generate new dictionary.")
+ false
+ }
+ } else {
+ throw new FileNotFoundException(
+ "The given dictionary file path is not found!")
+ }
+ }
+ }
+
+ /**
+ * get file headers from fact file
+ *
+ * @param carbonLoadModel
+ * @return headers
+ */
+ private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
+ var headers: Array[String] = null
+ val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
+ val readLine = CarbonUtil.readHeader(factFile)
+
+ if (null != readLine) {
+ val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+ "" + DEFAULT_SEPARATOR
+ } else {
+ carbonLoadModel.getCsvDelimiter
+ }
+ headers = readLine.toLowerCase().split(delimiter)
+ } else {
+ LOGGER.error("Not found file header! Please set fileheader")
+ throw new IOException("Failed to get file header")
+ }
+ headers
+ }
+
+ /**
+ * generate global dictionary with SQLContext and CarbonLoadModel
+ *
+ * @param sqlContext sql context
+ * @param carbonLoadModel carbon load model
+ */
+ def generateGlobalDictionary(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ dataFrame: Option[DataFrame] = None): Unit = {
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ // create dictionary folder if not exists
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
+ // columns which need to generate global dictionary file
+ val dimensions = carbonTable.getDimensionByTableName(
+ carbonTable.getFactTableName).asScala.toArray
+ // generate global dict from pre defined column dict file
+ carbonLoadModel.initPredefDictMap()
+
+ val allDictionaryPath = carbonLoadModel.getAllDictPath
+ if (StringUtils.isEmpty(allDictionaryPath)) {
+ LOGGER.info("Generate global dictionary from source data files!")
+ // load data by using dataSource com.databricks.spark.csv
+ var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
+ var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+ df.columns
+ } else {
+ carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
+ }
+ headers = headers.map(headerName => headerName.trim)
+ val colDictFilePath = carbonLoadModel.getColDictFilePath
+ if (colDictFilePath != null) {
+ // generate predefined dictionary
+ generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+ dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
+ }
+ if (headers.length > df.columns.length) {
+ val msg = "The number of columns in the file header do not match the " +
+ "number of columns in the data file; Either delimiter " +
+ "or fileheader provided is not correct"
+ LOGGER.error(msg)
+ throw new DataLoadingException(msg)
+ }
+ // use fact file to generate global dict
+ val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
+ headers, df.columns)
+ if (requireDimension.nonEmpty) {
+ // select column to push down pruning
+ df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
+ val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+ requireDimension, storePath, dictfolderPath, false)
+ // combine distinct value in a block and partition by column
+ val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
+ .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+ // generate global dictionary files
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+ // check result status
+ checkStatus(carbonLoadModel, sqlContext, model, statusList)
+ } else {
+ LOGGER.info("No column found for generating global dictionary in source data files")
+ }
+ // generate global dict from dimension file
+ if (carbonLoadModel.getDimFolderPath != null) {
+ val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
+ for (fileMap <- fileMapArray) {
+ val dimTableName = fileMap.split(":")(0)
+ var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
+ val (requireDimensionForDim, requireColumnNamesForDim) =
+ pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
+ if (requireDimensionForDim.length >= 1) {
+ dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
+ requireColumnNamesForDim.tail: _*)
+ val modelforDim = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+ requireDimensionForDim, storePath, dictfolderPath, false)
+ val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
+ dimDataframe.rdd, modelforDim)
+ .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
+ val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
+ inputRDDforDim, modelforDim).collect()
+ checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
+ } else {
+ LOGGER.info(s"No columns in dimension table $dimTableName " +
+ "to generate global dictionary")
+ }
+ }
+ }
+ } else {
+ LOGGER.info("Generate global dictionary from dictionary files!")
+ val isNonempty = validateAllDictionaryPath(allDictionaryPath)
+ if (isNonempty) {
+ var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+ getHeaderFormFactFile(carbonLoadModel)
+ } else {
+ carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
+ }
+ headers = headers.map(headerName => headerName.trim)
+ // prune columns according to the CSV file header, dimension columns
+ val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
+ if (requireDimension.nonEmpty) {
+ val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
+ requireDimension, storePath, dictfolderPath, false)
+ // check if dictionary files contains bad record
+ val accumulator = sqlContext.sparkContext.accumulator(0)
+ // read local dictionary file, and group by key
+ val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
+ requireColumnNames, allDictionaryPath, accumulator)
+ // read exist dictionary and combine
+ val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
+ .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+ // generate global dictionary files
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+ // check result status
+ checkStatus(carbonLoadModel, sqlContext, model, statusList)
+ // if the dictionary contains wrong format record, throw ex
+ if (accumulator.value > 0) {
+ throw new DataLoadingException("Data Loading failure, dictionary values are " +
+ "not in correct format!")
+ }
+ } else {
+ LOGGER.info("have no column need to generate global dictionary")
+ }
+ }
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "generate global dictionary failed")
+ throw ex
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
new file mode 100644
index 0000000..79c0cc8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+
+/**
+ * Implicit functions for [TableIdentifier]
+ */
+object CarbonTableIdentifierImplicit {
+ def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+
+ implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+ tableIdentifier match {
+ case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
+ case Seq(tableName) => TableIdentifier(tableName, None)
+ case _ => throw new IllegalArgumentException("invalid table identifier: " + tableIdentifier)
+ }
+ }
+
+ implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = {
+ tableIdentifier.database match {
+ case Some(dbName) => Seq(dbName, tableIdentifier.table)
+ case _ => Seq(tableIdentifier.table)
+ }
+ }
+}