You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:33 UTC
[16/20] incubator-carbondata git commit: Merge remote-tracking branch
'HuaweiBigData/master' into apache/master
Merge remote-tracking branch 'HuaweiBigData/master' into apache/master
Conflicts:
core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/97598381
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/97598381
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/97598381
Branch: refs/heads/master
Commit: 975983816666c07b401fdc75b9729c3fabe61e88
Parents: 29f9cf2 89847ea
Author: ravipesala <ra...@gmail.com>
Authored: Thu Aug 4 19:28:37 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 4 19:28:37 2016 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 +
.../store/impl/DFSFileHolderImpl.java | 4 +-
.../datastorage/store/impl/FileFactory.java | 6 +-
.../carbondata/core/reader/ThriftReader.java | 7 +-
.../CarbonDictionarySortIndexReaderImpl.java | 43 +++-
.../carbondata/core/writer/ThriftWriter.java | 7 +-
...CarbonDictionarySortIndexReaderImplTest.java | 15 +-
...CarbonDictionarySortIndexWriterImplTest.java | 15 ++
.../allqueries/AllDataTypesTestCase4.scala | 4 +-
.../allqueries/AllDataTypesTestCase6.scala | 16 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 28 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 7 +-
.../execution/command/carbonTableSchema.scala | 10 +-
.../spark/rdd/CarbonDataLoadRDD.scala | 21 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 18 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 21 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 254 +++++++++----------
.../org/carbondata/spark/rdd/Compactor.scala | 1 +
.../carbondata/spark/util/CarbonScalaUtil.scala | 50 +++-
.../testsuite/bigdecimal/TestBigDecimal.scala | 59 ++++-
.../MajorCompactionStopsAfterCompaction.scala | 2 +-
.../dataretention/DataRetentionTestCase.scala | 33 ++-
.../processing/csvload/DataGraphExecuter.java | 7 +-
.../processing/csvload/GraphExecutionUtil.java | 5 +-
.../processing/csvreaderstep/CsvInput.java | 24 +-
.../csvreaderstep/UnivocityCsvParser.java | 1 +
.../sortdata/IntermediateFileMerger.java | 5 +-
.../store/CarbonFactDataHandlerColumnar.java | 28 +-
.../csvbased/CarbonCSVBasedSeqGenStep.java | 23 +-
30 files changed, 450 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 22b6021,e0ccd2b..6a460e8
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -1288,37 -1308,35 +1287,37 @@@ class CarbonSqlParser(
}
protected lazy val showLoads: Parser[LogicalPlan] =
- SHOW ~> (LOADS|SEGMENTS) ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
(LIMIT ~> numericLit).? <~
opt(";") ^^ {
- case schemaName ~ cubeName ~ limit =>
- ShowLoadsCommand(schemaName, cubeName.toLowerCase(), limit)
+ case databaseName ~ tableName ~ limit =>
+ ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
}
protected lazy val segmentId: Parser[String] =
- ( numericLit ^^ { u => u } |
- elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
- )
+ numericLit ^^ { u => u } |
+ elem("decimal", p => {
+ p.getClass.getSimpleName.equals("FloatLit") ||
+ p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
- DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+ DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
(ident <~ ".").? ~ ident) <~
opt(";") ^^ {
- case loadids ~ cube => cube match {
- case schemaName ~ cubeName => DeleteLoadsById(loadids, schemaName, cubeName.toLowerCase())
+ case loadids ~ table => table match {
+ case databaseName ~ tableName =>
+ DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
}
}
protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
- DELETE ~> (LOADS|SEGMENTS) ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
opt(";") ^^ {
- case schema ~ cube ~ condition =>
+ case schema ~ table ~ condition =>
condition match {
case dateField ~ dateValue =>
- DeleteLoadsByLoadDate(schema, cube.toLowerCase(), dateField, dateValue)
+ DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d19d1c5,cc08604..11358fc
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -798,16 -1225,13 +798,13 @@@ private[sql] case class AlterTableCompa
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.cubeMeta.storePath)
+ carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+ carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- val partitioner = relation.cubeMeta.partitioner
+ val partitioner = relation.tableMeta.partitioner
+ val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
- var kettleHomePath = CarbonScalaUtil.getKettleHomePath(sqlContext)
- if (kettleHomePath == null) {
- sys.error(s"carbon.kettle.home is not set")
- }
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 5a1dedc,0000000..144af04
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,290 -1,0 +1,290 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.spark.rdd
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.carbondata.common.CarbonIterator
+import org.carbondata.common.logging.LogServiceFactory
+import org.carbondata.core.cache.dictionary.Dictionary
+import org.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
+import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.carbondata.scan.executor.QueryExecutorFactory
+import org.carbondata.scan.expression.Expression
+import org.carbondata.scan.model.QueryModel
+import org.carbondata.scan.result.BatchResult
+import org.carbondata.scan.result.iterator.ChunkRowIterator
+import org.carbondata.spark.RawValue
+import org.carbondata.spark.load.CarbonLoaderUtil
+import org.carbondata.spark.util.QueryPlanUtil
+
+class CarbonSparkPartition(rddId: Int, val idx: Int,
- val locations: Array[String],
- val tableBlockInfos: util.List[TableBlockInfo])
++ val locations: Array[String],
++ val tableBlockInfos: util.List[TableBlockInfo])
+ extends Partition {
+
+ override val index: Int = idx
+
+ // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
+ override def hashCode(): Int = {
+ 41 * (41 + rddId) + idx
+ }
+}
+
- /**
- * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
- * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
- * level filtering in driver side.
- */
++/**
++ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
++ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
++ * level filtering in driver side.
++ */
+class CarbonScanRDD[V: ClassTag](
- sc: SparkContext,
- queryModel: QueryModel,
- filterExpression: Expression,
- keyClass: RawValue[V],
- @transient conf: Configuration,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- baseStoreLocation: String)
++ sc: SparkContext,
++ queryModel: QueryModel,
++ filterExpression: Expression,
++ keyClass: RawValue[V],
++ @transient conf: Configuration,
++ tableCreationTime: Long,
++ schemaLastUpdatedTime: Long,
++ baseStoreLocation: String)
+ extends RDD[V](sc, Nil) with Logging {
+
+ val defaultParallelism = sc.defaultParallelism
+
+ override def getPartitions: Array[Partition] = {
+ val statisticRecorder = new QueryStatisticsRecorder(queryModel.getQueryId)
+ val startTime = System.currentTimeMillis()
+ val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
+ QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+
+ val result = new util.ArrayList[Partition](defaultParallelism)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ // set filter resolver tree
+ try {
+ // before applying filter check whether segments are available in the table.
+ val splits = carbonInputFormat.getSplits(job)
+ if (!splits.isEmpty) {
+ val filterResolver = carbonInputFormat
+ .getResolvedFilter(job.getConfiguration, filterExpression)
+ CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
+ queryModel.setFilterExpressionResolverTree(filterResolver)
+ }
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ }
+ // get splits
+ val splits = carbonInputFormat.getSplits(job)
+ if (!splits.isEmpty) {
+ val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+ val blockList = carbonInputSplits.map(inputSplit =>
+ new TableBlockInfo(inputSplit.getPath.toString,
+ inputSplit.getStart, inputSplit.getSegmentId,
+ inputSplit.getLocations, inputSplit.getLength
+ ).asInstanceOf[Distributable]
+ )
+ if (blockList.nonEmpty) {
+ // group blocks to nodes, tasks
+ val startTime = System.currentTimeMillis
+ var statistic = new QueryStatistic
+ val activeNodes = DistributionUtil
+ .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
+ val nodeBlockMapping =
+ CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
+ activeNodes.toList.asJava
+ )
+ val timeElapsed: Long = System.currentTimeMillis - startTime
+ statistic.addStatistics("Total Time taken in block(s) allocation", System.currentTimeMillis)
+ statisticRecorder.recordStatistics(statistic);
+ statistic = new QueryStatistic
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { entry =>
+ entry._2.asScala.foreach { blocksPerTask => {
+ val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
+ if (blocksPerTask.size() != 0) {
+ result
+ .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
+ i += 1
+ }
+ }
+ }
+ }
+ val noOfBlocks = blockList.size
+ val noOfNodes = nodeBlockMapping.size
+ val noOfTasks = result.size()
+ logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
+ + s"parallelism: $defaultParallelism , " +
+ s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
+ )
+ statistic.addStatistics("Time taken to identify Block(s) to scan", System.currentTimeMillis)
+ statisticRecorder.recordStatistics(statistic);
+ statisticRecorder.logStatistics
+ result.asScala.foreach { r =>
+ val cp = r.asInstanceOf[CarbonSparkPartition]
+ logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
+ + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+ )
+ }
+ } else {
+ logInfo("No blocks identified to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ }
+ else {
+ logInfo("No valid segments found to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ result.toArray(new Array[Partition](result.size()))
+ }
+
- override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[V] {
- var rowIterator: CarbonIterator[Array[Any]] = _
- var queryStartTime: Long = 0
- try {
- val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
- if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
- queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
- // fill table block info
- queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
- queryStartTime = System.currentTimeMillis
++ override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
++ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
++ val iter = new Iterator[V] {
++ var rowIterator: CarbonIterator[Array[Any]] = _
++ var queryStartTime: Long = 0
++ try {
++ val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
++ if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
++ queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
++ // fill table block info
++ queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
++ queryStartTime = System.currentTimeMillis
+
- val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- logInfo("*************************" + carbonPropertiesFilePath)
- if (null == carbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
- }
- // execute query
- rowIterator = new ChunkRowIterator(
- QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
- asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
++ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
++ logInfo("*************************" + carbonPropertiesFilePath)
++ if (null == carbonPropertiesFilePath) {
++ System.setProperty("carbon.properties.filepath",
++ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
++ }
++ // execute query
++ rowIterator = new ChunkRowIterator(
++ QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
++ asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- if (null != e.getMessage) {
- sys.error("Exception occurred in query execution :: " + e.getMessage)
- } else {
- sys.error("Exception occurred in query execution.Please check logs.")
- }
- }
++ }
++ } catch {
++ case e: Exception =>
++ LOGGER.error(e)
++ if (null != e.getMessage) {
++ sys.error("Exception occurred in query execution :: " + e.getMessage)
++ } else {
++ sys.error("Exception occurred in query execution.Please check logs.")
++ }
++ }
+
- var havePair = false
- var finished = false
- var recordCount = 0
++ var havePair = false
++ var finished = false
++ var recordCount = 0
+
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = (null == rowIterator) || (!rowIterator.hasNext)
- havePair = !finished
- }
- if (finished) {
- clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- if (null != queryModel.getStatisticsRecorder) {
- val queryStatistic = new QueryStatistic
- queryStatistic
- .addStatistics("Total Time taken to execute the query in executor Side",
- System.currentTimeMillis - queryStartTime
- )
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
- queryModel.getStatisticsRecorder.logStatistics();
- }
- }
- !finished
- }
++ override def hasNext: Boolean = {
++ if (!finished && !havePair) {
++ finished = (null == rowIterator) || (!rowIterator.hasNext)
++ havePair = !finished
++ }
++ if (finished) {
++ clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++ if (null != queryModel.getStatisticsRecorder) {
++ val queryStatistic = new QueryStatistic
++ queryStatistic
++ .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++ System.currentTimeMillis - queryStartTime
++ )
++ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++ queryModel.getStatisticsRecorder.logStatistics();
++ }
++ }
++ !finished
++ }
+
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- recordCount += 1
- if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
- clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- if (null != queryModel.getStatisticsRecorder) {
- val queryStatistic = new QueryStatistic
- queryStatistic
- .addStatistics("Total Time taken to execute the query in executor Side",
- System.currentTimeMillis - queryStartTime
- )
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
- queryModel.getStatisticsRecorder.logStatistics();
- }
- }
- keyClass.getValue(rowIterator.next())
- }
- def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
- if (null != columnToDictionaryMap) {
- org.carbondata.spark.util.CarbonQueryUtil
- .clearColumnDictionaryCache(columnToDictionaryMap)
- }
- }
- }
- iter
- }
++ override def next(): V = {
++ if (!hasNext) {
++ throw new java.util.NoSuchElementException("End of stream")
++ }
++ havePair = false
++ recordCount += 1
++ if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
++ clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++ if (null != queryModel.getStatisticsRecorder) {
++ val queryStatistic = new QueryStatistic
++ queryStatistic
++ .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++ System.currentTimeMillis - queryStartTime
++ )
++ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++ queryModel.getStatisticsRecorder.logStatistics();
++ }
++ }
++ keyClass.getValue(rowIterator.next())
++ }
++ def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
++ if (null != columnToDictionaryMap) {
++ org.carbondata.spark.util.CarbonQueryUtil
++ .clearColumnDictionaryCache(columnToDictionaryMap)
++ }
++ }
++ }
++ iter
++ }
+
- /**
- * Get the preferred locations where to launch this task.
- */
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- val theSplit = partition.asInstanceOf[CarbonSparkPartition]
- val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
- val tableBlocks = theSplit.tableBlockInfos
- // node name and count mapping
- val blockMap = new util.LinkedHashMap[String, Integer]()
++ /**
++ * Get the preferred locations where to launch this task.
++ */
++ override def getPreferredLocations(partition: Partition): Seq[String] = {
++ val theSplit = partition.asInstanceOf[CarbonSparkPartition]
++ val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
++ val tableBlocks = theSplit.tableBlockInfos
++ // node name and count mapping
++ val blockMap = new util.LinkedHashMap[String, Integer]()
+
- tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
- location => {
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
- }
- }
- )
- )
++ tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
++ location => {
++ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
++ val currentCount = blockMap.get(location)
++ if (currentCount == null) {
++ blockMap.put(location, 1)
++ } else {
++ blockMap.put(location, currentCount + 1)
++ }
++ }
++ }
++ )
++ )
+
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
- }
- )
++ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
++ nodeCount1.getValue > nodeCount2.getValue
++ }
++ )
+
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
- }
++ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
++ firstOptionLocation ++ sortedNodesList
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 5406e77,87dd0ce..8b91745
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@@ -17,9 -17,13 +17,12 @@@
package org.carbondata.spark.util
+ import java.io.File
+
import scala.collection.JavaConverters._
+ import org.apache.spark.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.Level
import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
import org.apache.spark.sql.types._
@@@ -27,9 -31,11 +30,10 @@@ import org.carbondata.core.carbon.metad
import org.carbondata.core.carbon.metadata.encoder.Encoding
import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.carbondata.core.constants.CarbonCommonConstants
+ import org.carbondata.core.datastorage.store.impl.FileFactory
-import org.carbondata.core.util.CarbonProperties
-import org.carbondata.query.expression.{DataType => CarbonDataType}
+import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
- object CarbonScalaUtil {
+ object CarbonScalaUtil extends Logging {
def convertSparkToCarbonDataType(
dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
dataType match {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------