You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:46 UTC
[08/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 0000000..9360ad8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.SQLContext
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.CarbonSparkFactory
+import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
+
+case class TableModel(
+ ifNotExistsSet: Boolean,
+ var databaseName: String,
+ databaseNameOp: Option[String],
+ tableName: String,
+ tableProperties: Map[String, String],
+ dimCols: Seq[Field],
+ msrCols: Seq[Field],
+ highcardinalitydims: Option[Seq[String]],
+ noInvertedIdxCols: Option[Seq[String]],
+ columnGroups: Seq[String],
+ colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
+
+case class Field(column: String, var dataType: Option[String], name: Option[String],
+ children: Option[List[Field]], parent: String = null,
+ storeType: Option[String] = Some("columnar"),
+ var precision: Int = 0, var scale: Int = 0)
+
+case class ColumnProperty(key: String, value: String)
+
+case class ComplexField(complexType: String, primitiveField: Option[Field],
+ complexField: Option[ComplexField])
+
+case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int,
+ nodeList: Array[String])
+
+case class PartitionerField(partitionColumn: String, dataType: Option[String],
+ columnComment: String)
+
+case class DataLoadTableFileMapping(table: String, loadPath: String)
+
+case class CarbonMergerMapping(storeLocation: String,
+ storePath: String,
+ metadataFilePath: String,
+ mergedLoadName: String,
+ kettleHomePath: String,
+ tableCreationTime: Long,
+ databaseName: String,
+ factTableName: String,
+ validSegments: Array[String],
+ tableId: String,
+ // maxSegmentColCardinality is Cardinality of last segment of compaction
+ var maxSegmentColCardinality: Array[Int],
+ // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
+ var maxSegmentColumnSchemaList: List[ColumnSchema])
+
+case class NodeInfo(TaskId: String, noOfBlocks: Int)
+
+case class AlterTableModel(dbName: Option[String], tableName: String,
+ compactionType: String, alterSql: String)
+
+case class CompactionModel(compactionSize: Long,
+ compactionType: CompactionType,
+ carbonTable: CarbonTable,
+ tableCreationTime: Long,
+ isDDLTrigger: Boolean)
+
+case class CompactionCallableModel(storePath: String,
+ carbonLoadModel: CarbonLoadModel,
+ storeLocation: String,
+ carbonTable: CarbonTable,
+ kettleHomePath: String,
+ cubeCreationTime: Long,
+ loadsToMerge: util.List[LoadMetadataDetails],
+ sqlContext: SQLContext,
+ compactionType: CompactionType)
+
+object TableNewProcessor {
+ def apply(cm: TableModel, sqlContext: SQLContext): TableInfo = {
+ new TableNewProcessor(cm, sqlContext).process
+ }
+}
+
+class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
+
+ var index = 0
+ var rowGroup = 0
+
+ def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
+ var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
+ fieldChildren.foreach(fields => {
+ fields.foreach(field => {
+ val encoders = new java.util.ArrayList[Encoding]()
+ encoders.add(Encoding.DICTIONARY)
+ val columnSchema: ColumnSchema = getColumnSchema(
+ DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+ field.name.getOrElse(field.column), index,
+ isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale)
+ allColumns ++= Seq(columnSchema)
+ index = index + 1
+ rowGroup = rowGroup + 1
+ if (field.children.get != null) {
+ columnSchema.setNumberOfChild(field.children.get.size)
+ allColumns ++= getAllChildren(field.children)
+ }
+ })
+ })
+ allColumns
+ }
+
+ def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
+ encoders: java.util.List[Encoding], isDimensionCol: Boolean,
+ colGroup: Integer, precision: Integer, scale: Integer): ColumnSchema = {
+ val columnSchema = new ColumnSchema()
+ columnSchema.setDataType(dataType)
+ columnSchema.setColumnName(colName)
+ val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+ if (highCardinalityDims.contains(colName)) {
+ encoders.remove(encoders.remove(Encoding.DICTIONARY))
+ }
+ if (dataType == DataType.TIMESTAMP) {
+ encoders.add(Encoding.DIRECT_DICTIONARY)
+ }
+ val colPropMap = new java.util.HashMap[String, String]()
+ if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
+ val colProps = cm.colProps.get.get(colName)
+ colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
+ }
+ columnSchema.setColumnProperties(colPropMap)
+ columnSchema.setEncodingList(encoders)
+ val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+ val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
+ columnSchema)
+ columnSchema.setColumnUniqueId(columnUniqueId)
+ columnSchema.setColumnReferenceId(columnUniqueId)
+ columnSchema.setColumnar(isCol)
+ columnSchema.setDimensionColumn(isDimensionCol)
+ columnSchema.setColumnGroup(colGroup)
+ columnSchema.setPrecision(precision)
+ columnSchema.setScale(scale)
+ // TODO: Need to fill RowGroupID, converted type
+ // & Number of Children after DDL finalization
+ columnSchema
+ }
+
+ // process create dml fields and create wrapper TableInfo object
+ def process: TableInfo = {
+ val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
+ var allColumns = Seq[ColumnSchema]()
+ var index = 0
+ cm.dimCols.foreach(field => {
+ val encoders = new java.util.ArrayList[Encoding]()
+ encoders.add(Encoding.DICTIONARY)
+ val columnSchema: ColumnSchema = getColumnSchema(
+ DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+ field.name.getOrElse(field.column),
+ index,
+ isCol = true,
+ encoders,
+ isDimensionCol = true,
+ -1,
+ field.precision,
+ field.scale)
+ allColumns ++= Seq(columnSchema)
+ index = index + 1
+ if (field.children.isDefined && field.children.get != null) {
+ columnSchema.setNumberOfChild(field.children.get.size)
+ allColumns ++= getAllChildren(field.children)
+ }
+ })
+
+ cm.msrCols.foreach(field => {
+ val encoders = new java.util.ArrayList[Encoding]()
+ val columnSchema: ColumnSchema = getColumnSchema(
+ DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+ field.name.getOrElse(field.column),
+ index,
+ isCol = true,
+ encoders,
+ isDimensionCol = false,
+ -1,
+ field.precision,
+ field.scale)
+ val measureCol = columnSchema
+
+ allColumns ++= Seq(measureCol)
+ index = index + 1
+ })
+
+ // Check if there is any duplicate measures or dimensions.
+ // Its based on the dimension name and measure name
+ allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
+ val name = f._1
+ LOGGER.error(s"Duplicate column found with name: $name")
+ LOGGER.audit(
+ s"Validation failed for Create/Alter Table Operation " +
+ s"for ${ cm.databaseName }.${ cm.tableName }" +
+ s"Duplicate column found with name: $name")
+ sys.error(s"Duplicate dimensions found with name: $name")
+ })
+
+ val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+
+ checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims)
+
+ updateColumnGroupsInFields(cm.columnGroups, allColumns)
+
+ var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
+ val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
+ val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
+ for (column <- allColumns) {
+ if (highCardinalityDims.contains(column.getColumnName)) {
+ newOrderedDims += column
+ } else if (column.isComplex) {
+ complexDims += column
+ } else if (column.isDimensionColumn) {
+ newOrderedDims += column
+ } else {
+ measures += column
+ }
+
+ }
+
+ // Setting the boolean value of useInvertedIndex in column schema
+ val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
+ for (column <- allColumns) {
+ // When the column is measure or the specified no inverted index column in DDL,
+ // set useInvertedIndex to false, otherwise true.
+ if (noInvertedIndexCols.contains(column.getColumnName) ||
+ cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
+ column.setUseInvertedIndex(false)
+ } else {
+ column.setUseInvertedIndex(true)
+ }
+ }
+
+ // Adding dummy measure if no measure is provided
+ if (measures.size < 1) {
+ val encoders = new java.util.ArrayList[Encoding]()
+ val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
+ CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
+ index,
+ true,
+ encoders,
+ false,
+ -1, 0, 0)
+ columnSchema.setInvisible(true)
+ val measureColumn = columnSchema
+ measures += measureColumn
+ allColumns = allColumns ++ measures
+ }
+ val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
+ columnValidator.validateColumns(allColumns)
+ newOrderedDims = newOrderedDims ++ complexDims ++ measures
+
+ val tableInfo = new TableInfo()
+ val tableSchema = new TableSchema()
+ val schemaEvol = new SchemaEvolution()
+ schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
+ tableSchema.setTableId(UUID.randomUUID().toString)
+ // populate table properties map
+ val tablePropertiesMap = new java.util.HashMap[String, String]()
+ cm.tableProperties.foreach {
+ x => tablePropertiesMap.put(x._1, x._2)
+ }
+ tableSchema.setTableProperties(tablePropertiesMap)
+ tableSchema.setTableName(cm.tableName)
+ tableSchema.setListOfColumns(allColumns.asJava)
+ tableSchema.setSchemaEvalution(schemaEvol)
+ tableInfo.setDatabaseName(cm.databaseName)
+ tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
+ tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ tableInfo.setFactTable(tableSchema)
+ tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
+ tableInfo
+ }
+
+ // For checking if the specified col group columns are specified in fields list.
+ protected def checkColGroupsValidity(colGrps: Seq[String],
+ allCols: Seq[ColumnSchema],
+ highCardCols: Seq[String]): Unit = {
+ if (null != colGrps) {
+ colGrps.foreach(columngroup => {
+ val rowCols = columngroup.split(",")
+ rowCols.foreach(colForGrouping => {
+ var found: Boolean = false
+ // check for dimensions + measures
+ allCols.foreach(eachCol => {
+ if (eachCol.getColumnName.equalsIgnoreCase(colForGrouping.trim())) {
+ found = true
+ }
+ })
+ // check for No Dicitonary dimensions
+ highCardCols.foreach(noDicCol => {
+ if (colForGrouping.trim.equalsIgnoreCase(noDicCol)) {
+ found = true
+ }
+ })
+
+ if (!found) {
+ sys.error(s"column $colForGrouping is not present in Field list")
+ }
+ })
+ })
+ }
+ }
+
+ // For updating the col group details for fields.
+ private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: Seq[ColumnSchema]): Unit = {
+ if (null != colGrps) {
+ var colGroupId = -1
+ colGrps.foreach(columngroup => {
+ colGroupId += 1
+ val rowCols = columngroup.split(",")
+ rowCols.foreach(row => {
+
+ allCols.foreach(eachCol => {
+
+ if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) {
+ eachCol.setColumnGroup(colGroupId)
+ eachCol.setColumnar(false)
+ }
+ })
+ })
+ })
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
new file mode 100644
index 0000000..e08660c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+object DistributionUtil {
+ @transient
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /*
+ * This method will return the list of executers in the cluster.
+ * For this we take the memory status of all node with getExecutorMemoryStatus
+ * and extract the keys. getExecutorMemoryStatus also returns the driver memory also
+ * In client mode driver will run in the localhost
+ * There can be executor spawn in same drive node. So we can remove first occurance of
+ * localhost for retriving executor list
+ */
+ def getNodeList(sparkContext: SparkContext): Array[String] = {
+ val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+ kv._1.split(":")(0)
+ }.toSeq
+ val localhostIPs = getLocalhostIPs
+ val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
+
+ val nodelist: List[String] = withoutDriverIP(arr.toList)(selectedLocalIPList.contains(_))
+ val masterMode = sparkContext.getConf.get("spark.master")
+ if (nodelist.nonEmpty) {
+ // Specific for Yarn Mode
+ if ("yarn-cluster".equals(masterMode) || "yarn-client".equals(masterMode)) {
+ val nodeNames = nodelist.map { x =>
+ val addr = InetAddress.getByName(x)
+ addr.getHostName
+ }
+ nodeNames.toArray
+ } else {
+ // For Standalone cluster, node IPs will be returned.
+ nodelist.toArray
+ }
+ } else {
+ Seq(InetAddress.getLocalHost.getHostName).toArray
+ }
+ }
+
+ private def getLocalhostIPs = {
+ val iface = NetworkInterface.getNetworkInterfaces
+ var addresses: List[InterfaceAddress] = List.empty
+ while (iface.hasMoreElements) {
+ addresses = iface.nextElement().getInterfaceAddresses.asScala.toList ++ addresses
+ }
+ val inets = addresses.map(_.getAddress.getHostAddress)
+ inets
+ }
+
+ /*
+ * This method will remove the first occurance of any of the ips mentioned in the predicate.
+ * Eg: l = List(Master,slave1,Master,slave2,slave3) is the list of nodes where first Master is
+ * the Driver node.
+ * this method withoutFirst (l)(x=> x == 'Master') will remove the first occurance of Master.
+ * The resulting List containt List(slave1,Master,slave2,slave3)
+ */
+ def withoutDriverIP[A](xs: List[A])(p: A => Boolean): List[A] = {
+ xs match {
+ case x :: rest => if (p(x)) {
+ rest
+ } else {
+ x :: withoutDriverIP(rest)(p)
+ }
+ case _ => Nil
+ }
+ }
+
+ /**
+ *
+ * Checking if the existing executors is greater than configured executors, if yes
+ * returning configured executors.
+ *
+ * @param blockList
+ * @param sparkContext
+ * @return
+ */
+ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+ sparkContext: SparkContext): Seq[String] = {
+ val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+ ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
+ }
+
+ def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int,
+ sparkContext: SparkContext): Seq[String] = {
+ var confExecutorsTemp: String = null
+ if (sparkContext.getConf.contains("spark.executor.instances")) {
+ confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
+ } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
+ && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+ .equalsIgnoreCase("true")) {
+ if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
+ confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ }
+ }
+
+ val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+ val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
+ confExecutors
+ } else {
+ nodesOfData
+ }
+
+ val startTime = System.currentTimeMillis()
+ ensureExecutors(sparkContext, requiredExecutors)
+ var nodes = DistributionUtil.getNodeList(sparkContext)
+ var maxTimes = 30
+ while (nodes.length < requiredExecutors && maxTimes > 0) {
+ Thread.sleep(500)
+ nodes = DistributionUtil.getNodeList(sparkContext)
+ maxTimes = maxTimes - 1
+ }
+ val timDiff = System.currentTimeMillis() - startTime
+ LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
+ LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
+ nodes.distinct
+ }
+
+ /**
+ * Requesting the extra executors other than the existing ones.
+ *
+ * @param sc
+ * @param numExecutors
+ * @return
+ */
+ def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
+ sc.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ val requiredExecutors = numExecutors - b.numExistingExecutors
+ LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
+ s"${ b.numExistingExecutors }")
+ if (requiredExecutors > 0) {
+ b.requestExecutors(requiredExecutors)
+ }
+ true
+ case _ =>
+ false
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
new file mode 100644
index 0000000..7909a13
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical._
+
+abstract class AbstractNode
+
+case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode
+
+case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
+ extends AbstractNode
+
+case class CarbonDictionaryTempDecoder(
+ attrList: util.Set[AttributeReferenceWrapper],
+ attrsNotDecode: util.Set[AttributeReferenceWrapper],
+ child: LogicalPlan,
+ isOuter: Boolean = false) extends UnaryNode {
+ var processed = false
+
+ def getAttrsNotDecode: util.Set[Attribute] = {
+ val set = new util.HashSet[Attribute]()
+ attrsNotDecode.asScala.foreach(f => set.add(f.attr))
+ set
+ }
+
+ def getAttrList: util.Set[Attribute] = {
+ val set = new util.HashSet[Attribute]()
+ attrList.asScala.foreach(f => set.add(f.attr))
+ set
+ }
+
+ override def output: Seq[Attribute] = child.output
+}
+
+class CarbonDecoderProcessor {
+
+ def getDecoderList(plan: LogicalPlan): util.List[AbstractNode] = {
+ val nodeList = new util.ArrayList[AbstractNode]
+ process(plan, nodeList)
+ nodeList
+ }
+
+ private def process(plan: LogicalPlan, nodeList: util.List[AbstractNode]): Unit = {
+ plan match {
+ case cd: CarbonDictionaryTempDecoder =>
+ nodeList.add(Node(cd))
+ process(cd.child, nodeList)
+ case j: BinaryNode =>
+ val leftList = new util.ArrayList[AbstractNode]
+ val rightList = new util.ArrayList[AbstractNode]
+ nodeList.add(BinaryCarbonNode(leftList, rightList))
+ process(j.left, leftList)
+ process(j.right, rightList)
+ case e: UnaryNode => process(e.child, nodeList)
+ case _ =>
+ }
+ }
+
+ def updateDecoders(nodeList: util.List[AbstractNode]): Unit = {
+ val scalaList = nodeList.asScala
+ val decoderNotDecode = new util.HashSet[AttributeReferenceWrapper]
+ updateDecoderInternal(scalaList, decoderNotDecode)
+ }
+
+ private def updateDecoderInternal(scalaList: mutable.Buffer[AbstractNode],
+ decoderNotDecode: util.HashSet[AttributeReferenceWrapper]): Unit = {
+ scalaList.reverseMap {
+ case Node(cd: CarbonDictionaryTempDecoder) =>
+ decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
+ decoderNotDecode.asScala.foreach(cd.attrList.remove)
+ decoderNotDecode.addAll(cd.attrList)
+ case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
+ val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
+ val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
+ updateDecoderInternal(left.asScala, leftNotDecode)
+ updateDecoderInternal(right.asScala, rightNotDecode)
+ decoderNotDecode.addAll(leftNotDecode)
+ decoderNotDecode.addAll(rightNotDecode)
+ }
+ }
+
+}
+
+case class AttributeReferenceWrapper(attr: Attribute) {
+
+ override def equals(other: Any): Boolean = other match {
+ case ar: AttributeReferenceWrapper =>
+ attr.name.equalsIgnoreCase(ar.attr.name) && attr.exprId == ar.attr.exprId
+ case _ => false
+ }
+
+ // constant hash value
+ lazy val hash = (attr.name.toLowerCase + "." + attr.exprId.id).hashCode
+ override def hashCode: Int = hash
+}
+
+case class Marker(set: util.Set[AttributeReferenceWrapper], binary: Boolean = false)
+
+class CarbonPlanMarker {
+ val markerStack = new util.Stack[Marker]
+ var joinCount = 0
+
+ def pushMarker(attrs: util.Set[AttributeReferenceWrapper]): Unit = {
+ markerStack.push(Marker(attrs))
+ }
+
+ def pushBinaryMarker(attrs: util.Set[AttributeReferenceWrapper]): Unit = {
+ markerStack.push(Marker(attrs, binary = true))
+ joinCount = joinCount + 1
+ }
+
+ def revokeJoin(): util.Set[AttributeReferenceWrapper] = {
+ if (joinCount > 0) {
+ while (!markerStack.empty()) {
+ val marker = markerStack.pop()
+ if (marker.binary) {
+ joinCount = joinCount - 1
+ return marker.set
+ }
+ }
+ }
+ markerStack.peek().set
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
new file mode 100644
index 0000000..387d1ef
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.processing.etl.DataLoadingException
+
+object FileUtils {
+ /**
+ * append all csv file path to a String, file path separated by comma
+ */
+ private def getPathsFromCarbonFile(carbonFile: CarbonFile, stringBuild: StringBuilder): Unit = {
+ if (carbonFile.isDirectory) {
+ val files = carbonFile.listFiles()
+ for (j <- 0 until files.size) {
+ getPathsFromCarbonFile(files(j), stringBuild)
+ }
+ } else {
+ val path = carbonFile.getAbsolutePath
+ val fileName = carbonFile.getName
+ if (carbonFile.getSize == 0) {
+ LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ .warn(s"skip empty input file: $path")
+ } else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
+ fileName.startsWith(CarbonCommonConstants.POINT)) {
+ LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ .warn(s"skip invisible input file: $path")
+ } else {
+ stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
+ }
+ }
+ }
+
+ /**
+ * append all file path to a String, inputPath path separated by comma
+ *
+ */
+ def getPaths(inputPath: String): String = {
+ if (inputPath == null || inputPath.isEmpty) {
+ throw new DataLoadingException("Input file path cannot be empty.")
+ } else {
+ val stringBuild = new StringBuilder()
+ val filePaths = inputPath.split(",")
+ for (i <- 0 until filePaths.size) {
+ val fileType = FileFactory.getFileType(filePaths(i))
+ val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
+ if (!carbonFile.exists()) {
+ throw new DataLoadingException(s"The input file does not exist: ${filePaths(i)}" )
+ }
+ getPathsFromCarbonFile(carbonFile, stringBuild)
+ }
+ if (stringBuild.nonEmpty) {
+ stringBuild.substring(0, stringBuild.size - 1)
+ } else {
+ throw new DataLoadingException("Please check your input path and make sure " +
+ "that files end with '.csv' and content is not empty.")
+ }
+ }
+ }
+
+ def getSpaceOccupied(inputPath: String): Long = {
+ var size : Long = 0
+ if (inputPath == null || inputPath.isEmpty) {
+ size
+ } else {
+ val filePaths = inputPath.split(",")
+ for (i <- 0 until filePaths.size) {
+ val fileType = FileFactory.getFileType(filePaths(i))
+ val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
+ size = size + carbonFile.getSize
+ }
+ size
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
new file mode 100644
index 0000000..516ba58
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.tools.reflect.ToolBox
+
+/**
+ * It compiles the code dynamically at runtime and returns the object
+ */
+object ScalaCompilerUtil {
+
+ def compiledCode(code: String): Any = {
+ import scala.reflect.runtime.universe._
+ val cm = runtimeMirror(Utils.getContextOrSparkClassLoader)
+ val toolbox = cm.mkToolBox()
+
+ val tree = toolbox.parse(code)
+ toolbox.compile(tree)()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
new file mode 100644
index 0000000..e2f4c3e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
+
+import org.apache.carbondata.core.load.BlockDetails
+
+/*
+ * this object use to handle file splits
+ */
+object SparkUtil {
+
+ def setTaskContext(context: TaskContext): Unit = {
+ val localThreadContext = TaskContext.get()
+ if (localThreadContext == null) {
+ TaskContext.setTaskContext(context)
+ }
+ }
+
+ /**
+ * get file splits,return Array[BlockDetails], if file path is empty,then return empty Array
+ *
+ */
+ def getSplits(path: String, sc: SparkContext): Array[BlockDetails] = {
+ val filePath = FileUtils.getPaths(path)
+ if (filePath == null || filePath.isEmpty) {
+ // return a empty block details
+ Array[BlockDetails]()
+ } else {
+ // clone the hadoop configuration
+ val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
+ // set folder or file
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePath)
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+ val newHadoopRDD = new NewHadoopRDD[LongWritable, Text](
+ sc,
+ classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
+ classOf[LongWritable],
+ classOf[Text],
+ hadoopConfiguration)
+ val splits: Array[FileSplit] = newHadoopRDD.getPartitions.map { part =>
+ part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
+ }
+ splits.map { block =>
+ new BlockDetails(block.getPath,
+ block.getStart,
+ block.getLength,
+ block.getLocations
+ )
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
deleted file mode 100644
index 6a2c839..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.executor.QueryExecutor;
-import org.apache.carbondata.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.scan.model.QueryDimension;
-import org.apache.carbondata.scan.model.QueryMeasure;
-import org.apache.carbondata.scan.model.QueryModel;
-import org.apache.carbondata.scan.result.BatchResult;
-import org.apache.carbondata.scan.result.iterator.RawResultIterator;
-
-/**
- * Executor class for executing the query on the selected segments to be merged.
- * This will fire a select * query and get the raw result.
- */
-public class CarbonCompactionExecutor {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
- private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
- private final SegmentProperties destinationSegProperties;
- private final String databaseName;
- private final String factTableName;
- private final Map<String, TaskBlockInfo> segmentMapping;
- private final String storePath;
- private QueryExecutor queryExecutor;
- private CarbonTable carbonTable;
- private QueryModel queryModel;
-
- /**
- * Constructor
- *
- * @param segmentMapping
- * @param segmentProperties
- * @param databaseName
- * @param factTableName
- * @param storePath
- * @param carbonTable
- */
- public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
- SegmentProperties segmentProperties, String databaseName, String factTableName,
- String storePath, CarbonTable carbonTable,
- Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
-
- this.segmentMapping = segmentMapping;
-
- this.destinationSegProperties = segmentProperties;
-
- this.databaseName = databaseName;
-
- this.factTableName = factTableName;
-
- this.storePath = storePath;
-
- this.carbonTable = carbonTable;
-
- this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
- }
-
- /**
- * For processing of the table blocks.
- *
- * @return List of Carbon iterators
- */
- public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
-
- List<RawResultIterator> resultList =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- List<TableBlockInfo> list = null;
- queryModel = prepareQueryModel(list);
- // iterate each seg ID
- for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
- String segmentId = taskMap.getKey();
- List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-
- int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
-
- SegmentProperties sourceSegProperties =
- new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
-
- // for each segment get taskblock info
- TaskBlockInfo taskBlockInfo = taskMap.getValue();
- Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
-
- for (String task : taskBlockListMapping) {
-
- list = taskBlockInfo.getTableBlockInfoList(task);
- Collections.sort(list);
- LOGGER.info("for task -" + task + "-block size is -" + list.size());
- queryModel.setTableBlockInfos(list);
- resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
- destinationSegProperties));
-
- }
- }
-
- return resultList;
- }
-
- /**
- * get executor and execute the query model.
- *
- * @param blockList
- * @return
- */
- private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
- throws QueryExecutionException {
-
- queryModel.setTableBlockInfos(blockList);
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
- CarbonIterator<BatchResult> iter = null;
- try {
- iter = queryExecutor.execute(queryModel);
- } catch (QueryExecutionException e) {
- LOGGER.error(e.getMessage());
- throw e;
- }
-
- return iter;
- }
-
- /**
- * Below method will be used
- * for cleanup
- */
- public void finish() {
- try {
- queryExecutor.finish();
- } catch (QueryExecutionException e) {
- LOGGER.error(e, "Problem while finish: ");
- }
- clearDictionaryFromQueryModel();
- }
-
- /**
- * This method will clear the dictionary access count after its usage is complete so
- * that column can be deleted form LRU cache whenever memory reaches threshold
- */
- private void clearDictionaryFromQueryModel() {
- if (null != queryModel) {
- Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
- if (null != columnToDictionaryMapping) {
- for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
- CarbonUtil.clearDictionaryCache(entry.getValue());
- }
- }
- }
- }
-
- /**
- * Preparing of the query model.
- *
- * @param blockList
- * @return
- */
- public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-
- QueryModel model = new QueryModel();
-
- model.setTableBlockInfos(blockList);
- model.setCountStarQuery(false);
- model.setDetailQuery(true);
- model.setForcedDetailRawQuery(true);
- model.setFilterExpressionResolverTree(null);
-
- List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
- QueryDimension queryDimension = new QueryDimension(dim.getColName());
- dims.add(queryDimension);
- }
- model.setQueryDimension(dims);
-
- List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
- QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
- msrs.add(queryMeasure);
- }
- model.setQueryMeasures(msrs);
-
- model.setQueryId(System.nanoTime() + "");
-
- model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-
- model.setAggTable(false);
- model.setLimit(-1);
-
- model.setTable(carbonTable);
-
- model.setInMemoryRecordSize(CarbonCommonConstants.COMPACTION_INMEMORY_RECORD_SIZE);
-
- return model;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
deleted file mode 100644
index ed669a6..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.CarbonUtilException;
-
-import org.apache.spark.sql.hive.TableMeta;
-
-/**
- * Utility Class for the Compaction Flow.
- */
-public class CarbonCompactionUtil {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-
- /**
- * To create a mapping of Segment Id and TableBlockInfo.
- *
- * @param tableBlockInfoList
- * @return
- */
- public static Map<String, TaskBlockInfo> createMappingForSegments(
- List<TableBlockInfo> tableBlockInfoList) {
-
- // stores taskBlockInfo of each segment
- Map<String, TaskBlockInfo> segmentBlockInfoMapping =
- new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-
- for (TableBlockInfo info : tableBlockInfoList) {
- String segId = info.getSegmentId();
- // check if segId is already present in map
- TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
- // extract task ID from file Path.
- String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
- // if taskBlockInfo is not there, then create and add
- if (null == taskBlockInfoMapping) {
- taskBlockInfoMapping = new TaskBlockInfo();
- groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
- // put the taskBlockInfo with respective segment id
- segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
- } else
- {
- groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
- }
- }
- return segmentBlockInfoMapping;
-
- }
-
- /**
- * Grouping the taskNumber and list of TableBlockInfo.
- * @param info
- * @param taskBlockMapping
- * @param taskNo
- */
- private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
- TaskBlockInfo taskBlockMapping, String taskNo) {
- // get the corresponding list from task mapping.
- List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
- if (null != blockLists) {
- blockLists.add(info);
- } else {
- blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- blockLists.add(info);
- taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
- }
- }
-
- /**
- * To create a mapping of Segment Id and DataFileFooter.
- *
- * @param tableBlockInfoList
- * @return
- */
- public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
- List<TableBlockInfo> tableBlockInfoList) throws IndexBuilderException {
-
- Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
- for (TableBlockInfo blockInfo : tableBlockInfoList) {
- List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
- String segId = blockInfo.getSegmentId();
-
- DataFileFooter dataFileMatadata = null;
- // check if segId is already present in map
- List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
- try {
- dataFileMatadata = CarbonUtil
- .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
- blockInfo.getBlockLength());
- } catch (CarbonUtilException e) {
- throw new IndexBuilderException(e);
- }
- if (null == metadataList) {
- // if it is not present
- eachSegmentBlocks.add(dataFileMatadata);
- segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
- } else {
-
- // if its already present then update the list.
- metadataList.add(dataFileMatadata);
- }
- }
- return segmentBlockInfoMapping;
-
- }
-
- /**
- * Check whether the file to indicate the compaction is present or not.
- * @param metaFolderPath
- * @return
- */
- public static boolean isCompactionRequiredForTable(String metaFolderPath) {
- String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.minorCompactionRequiredFile;
-
- String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.majorCompactionRequiredFile;
- try {
- if (FileFactory.isFileExist(minorCompactionStatusFile,
- FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
- .isFileExist(majorCompactionStatusFile,
- FileFactory.getFileType(majorCompactionStatusFile))) {
- return true;
- }
- } catch (IOException e) {
- LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage() );
- }
- return false;
- }
-
- /**
- * Determine the type of the compaction received.
- * @param metaFolderPath
- * @return
- */
- public static CompactionType determineCompactionType(String metaFolderPath) {
- String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.minorCompactionRequiredFile;
-
- String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.majorCompactionRequiredFile;
- try {
- if (FileFactory.isFileExist(minorCompactionStatusFile,
- FileFactory.getFileType(minorCompactionStatusFile))) {
- return CompactionType.MINOR_COMPACTION;
- }
- if (FileFactory.isFileExist(majorCompactionStatusFile,
- FileFactory.getFileType(majorCompactionStatusFile))) {
- return CompactionType.MAJOR_COMPACTION;
- }
-
- } catch (IOException e) {
- LOGGER.error("Exception in determining the compaction request file " + e.getMessage() );
- }
- return CompactionType.MINOR_COMPACTION;
- }
-
- /**
- * Delete the compation request file once the compaction is done.
- * @param metaFolderPath
- * @param compactionType
- * @return
- */
- public static boolean deleteCompactionRequiredFile(String metaFolderPath,
- CompactionType compactionType) {
- String compactionRequiredFile;
- if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
- compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.minorCompactionRequiredFile;
- } else {
- compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.majorCompactionRequiredFile;
- }
- try {
- if (FileFactory
- .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
- if (FileFactory
- .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
- .delete()) {
- LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
- return true;
- } else {
- LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
- }
- } else {
- LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
- }
- } catch (IOException e) {
- LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
- }
- return false;
- }
-
- /**
- * Creation of the compaction request if someother compaction is in progress.
- * @param metaFolderPath
- * @param compactionType
- * @return
- */
- public static boolean createCompactionRequiredFile(String metaFolderPath,
- CompactionType compactionType) {
- String statusFile;
- if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
- statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.minorCompactionRequiredFile;
- } else {
- statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.majorCompactionRequiredFile;
- }
- try {
- if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
- if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
- LOGGER.info("successfully created a compaction required file - " + statusFile);
- return true;
- } else {
- LOGGER.error("Not able to create a compaction required file - " + statusFile);
- return false;
- }
- } else {
- LOGGER.info("Compaction request file : " + statusFile + " already exist.");
- }
- } catch (IOException e) {
- LOGGER.error("Exception in creating the compaction request file " + e.getMessage() );
- }
- return false;
- }
-
- /**
- * This will check if any compaction request has been received for any table.
- *
- * @param tableMetas
- * @return
- */
- public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
- List<CarbonTableIdentifier> skipList) {
- for (TableMeta table : tableMetas) {
- CarbonTable ctable = table.carbonTable();
- String metadataPath = ctable.getMetaDataFilepath();
- // check for the compaction required file and at the same time exclude the tables which are
- // present in the skip list.
- if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
- .contains(table.carbonTableIdentifier())) {
- return table;
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
deleted file mode 100644
index 3aa0e9f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.spark.rdd.Compactor;
-
-import org.apache.spark.sql.execution.command.CompactionCallableModel;
-
-/**
- * Callable class which is used to trigger the compaction in a separate callable.
- */
-public class CompactionCallable implements Callable<Void> {
-
- private final CompactionCallableModel compactionCallableModel;
-
- public CompactionCallable(CompactionCallableModel compactionCallableModel) {
-
- this.compactionCallableModel = compactionCallableModel;
- }
-
- @Override public Void call() throws Exception {
-
- Compactor.triggerCompaction(compactionCallableModel);
- return null;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
deleted file mode 100644
index 40dd59d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CompactionType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-/**
- * This enum is used to define the types of Compaction.
- * We have 2 types. one is Minor and another is Major
- */
-public enum CompactionType {
- MINOR_COMPACTION,
- MAJOR_COMPACTION
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
deleted file mode 100644
index 1028f78..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.io.File;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This is the Merger class responsible for the merging of the segments.
- */
-public class RowResultMerger {
-
- private final String databaseName;
- private final String tableName;
- private final String tempStoreLocation;
- private final int measureCount;
- private final String factStoreLocation;
- private CarbonFactHandler dataHandler;
- private List<RawResultIterator> rawResultIteratorList =
- new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- private SegmentProperties segprop;
- /**
- * record holder heap
- */
- private AbstractQueue<RawResultIterator> recordHolderHeap;
-
- private TupleConversionAdapter tupleConvertor;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowResultMerger.class.getName());
-
- public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
- String tableName, SegmentProperties segProp, String tempStoreLocation,
- CarbonLoadModel loadModel, int[] colCardinality) {
-
- this.rawResultIteratorList = iteratorList;
- // create the List of RawResultIterator.
-
- recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
- new RowResultMerger.CarbonMdkeyComparator());
-
- this.segprop = segProp;
- this.tempStoreLocation = tempStoreLocation;
-
- this.factStoreLocation = loadModel.getStorePath();
-
- if (!new File(tempStoreLocation).mkdirs()) {
- LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
- }
-
- this.databaseName = databaseName;
- this.tableName = tableName;
-
- this.measureCount = segprop.getMeasures().size();
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- CarbonFactDataHandlerModel carbonFactDataHandlerModel =
- getCarbonFactDataHandlerModel(loadModel);
- carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
- CarbonDataFileAttributes carbonDataFileAttributes =
- new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
- loadModel.getFactTimeStamp());
- carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
- if (segProp.getNumberOfNoDictionaryDimension() > 0
- || segProp.getComplexDimensions().size() > 0) {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
- } else {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
- }
- carbonFactDataHandlerModel.setColCardinality(colCardinality);
- carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-
- tupleConvertor = new TupleConversionAdapter(segProp);
- }
-
- /**
- * Merge function
- *
- */
- public boolean mergerSlice() {
- boolean mergeStatus = false;
- int index = 0;
- try {
-
- dataHandler.initialise();
-
- // add all iterators to the queue
- for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
- this.recordHolderHeap.add(leaftTupleIterator);
- index++;
- }
- RawResultIterator iterator = null;
- while (index > 1) {
- // iterator the top record
- iterator = this.recordHolderHeap.poll();
- Object[] convertedRow = iterator.next();
- if(null == convertedRow){
- throw new SliceMergerException("Unable to generate mdkey during compaction.");
- }
- // get the mdkey
- addRow(convertedRow);
- // if there is no record in the leaf and all then decrement the
- // index
- if (!iterator.hasNext()) {
- index--;
- continue;
- }
- // add record to heap
- this.recordHolderHeap.add(iterator);
- }
- // if record holder is not empty then iterator the slice holder from
- // heap
- iterator = this.recordHolderHeap.poll();
- while (true) {
- Object[] convertedRow = iterator.next();
- if(null == convertedRow){
- throw new SliceMergerException("Unable to generate mdkey during compaction.");
- }
- addRow(convertedRow);
- // check if leaf contains no record
- if (!iterator.hasNext()) {
- break;
- }
- }
- this.dataHandler.finish();
- mergeStatus = true;
- } catch (Exception e) {
- LOGGER.error("Exception in compaction merger " + e.getMessage());
- mergeStatus = false;
- } finally {
- try {
- this.dataHandler.closeHandler();
- } catch (CarbonDataWriterException e) {
- LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
- mergeStatus = false;
- }
- }
-
- return mergeStatus;
- }
-
- /**
- * Below method will be used to add sorted row
- *
- * @throws SliceMergerException
- */
- protected void addRow(Object[] carbonTuple) throws SliceMergerException {
- Object[] rowInWritableFormat;
-
- rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
- try {
- this.dataHandler.addDataToStore(rowInWritableFormat);
- } catch (CarbonDataWriterException e) {
- throw new SliceMergerException("Problem in merging the slice", e);
- }
- }
-
- /**
- * This method will create a model object for carbon fact data handler
- *
- * @param loadModel
- * @return
- */
- private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
- CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
- carbonFactDataHandlerModel.setDatabaseName(databaseName);
- carbonFactDataHandlerModel.setTableName(tableName);
- carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
- carbonFactDataHandlerModel.setCompactionFlow(true);
- carbonFactDataHandlerModel
- .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
- carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
- carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
- carbonFactDataHandlerModel.setSegmentProperties(segprop);
- carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
- carbonFactDataHandlerModel.setDimensionCount(
- segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- List<ColumnSchema> wrapperColumnSchema = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
- carbonTable.getMeasureByTableName(tableName));
- carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
- //TO-DO Need to handle complex types here .
- Map<Integer, GenericDataType> complexIndexMap =
- new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
- carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
- carbonFactDataHandlerModel.setDataWritingRequest(true);
-
- char[] aggType = new char[segprop.getMeasures().size()];
- Arrays.fill(aggType, 'n');
- int i = 0;
- for (CarbonMeasure msr : segprop.getMeasures()) {
- aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
- }
- carbonFactDataHandlerModel.setAggType(aggType);
- carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
-
- String carbonDataDirectoryPath =
- checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
- loadModel.getPartitionId(), loadModel.getSegmentId());
- carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
-
- List<CarbonDimension> dimensionByTableName =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
- boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
- int index = 0;
- for (CarbonDimension dimension : dimensionByTableName) {
- isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
- }
- carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
- return carbonFactDataHandlerModel;
- }
-
- /**
- * This method will get the store location for the given path, segment id and partition id
- *
- * @return data directory path
- */
- private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
- String tableName, String partitionId, String segmentId) {
- String carbonStorePath = factStoreLocation;
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
- return carbonDataDirectoryPath;
- }
-
- /**
- * Comparator class for comparing 2 raw row result.
- */
- private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
-
- @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
-
- Object[] row1 = new Object[0];
- Object[] row2 = new Object[0];
- try {
- row1 = o1.fetchConverted();
- row2 = o2.fetchConverted();
- } catch (KeyGenException e) {
- LOGGER.error(e.getMessage());
- }
- if (null == row1 || null == row2) {
- return 0;
- }
- ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
- ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
- int compareResult = 0;
- int[] columnValueSizes = segprop.getEachDimColumnValueSize();
- int dictionaryKeyOffset = 0;
- byte[] dimCols1 = key1.getDictionaryKey();
- byte[] dimCols2 = key2.getDictionaryKey();
- int noDicIndex = 0;
- for (int eachColumnValueSize : columnValueSizes) {
- // case of dictionary cols
- if (eachColumnValueSize > 0) {
-
- compareResult = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
- dictionaryKeyOffset, eachColumnValueSize);
- dictionaryKeyOffset += eachColumnValueSize;
- } else { // case of no dictionary
-
- byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
- byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
- compareResult =
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
- noDicIndex++;
-
- }
- if (0 != compareResult) {
- return compareResult;
- }
- }
- return 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
deleted file mode 100644
index c35b3ff..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/TupleConversionAdapter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.integration.spark.merger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
-import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This class will be used to convert the Result into the format used in data writer.
- */
-public class TupleConversionAdapter {
-
- private final SegmentProperties segmentproperties;
-
- private final List<CarbonMeasure> measureList;
-
- private int noDictionaryPresentIndex;
-
- private int measureCount;
-
- private boolean isNoDictionaryPresent;
-
- public TupleConversionAdapter(SegmentProperties segmentProperties) {
- this.measureCount = segmentProperties.getMeasures().size();
- this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
- if (isNoDictionaryPresent) {
- noDictionaryPresentIndex++;
- }
- this.segmentproperties = segmentProperties;
- measureList = segmentProperties.getMeasures();
- }
-
- /**
- * Converting the raw result to the format understandable by the data writer.
- * @param carbonTuple
- * @return
- */
- public Object[] getObjectArray(Object[] carbonTuple) {
- Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
- int index = 0;
- // put measures.
-
- for (int j = 1; j <= measureCount; j++) {
- row[index++] = carbonTuple[j];
- }
-
- // put No dictionary byte []
- if (isNoDictionaryPresent) {
-
- int noDicCount = segmentproperties.getNumberOfNoDictionaryDimension();
- List<byte[]> noDicByteArr = new ArrayList<>(noDicCount);
- for (int i = 0; i < noDicCount; i++) {
- noDicByteArr.add(((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeyByIndex(i));
- }
- byte[] singleByteArr = RemoveDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
-
- row[index++] = singleByteArr;
- }
-
- // put No Dictionary Dims
- row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
- return row;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
deleted file mode 100644
index de7d4a2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-
-import java.util.Locale;
-
-public class MalformedCarbonCommandException extends Exception {
-
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public MalformedCarbonCommandException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public MalformedCarbonCommandException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override
- public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}