You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:33 UTC

[16/20] incubator-carbondata git commit: Merge remote-tracking branch 'HuaweiBigData/master' into apache/master

Merge remote-tracking branch 'HuaweiBigData/master' into apache/master

Conflicts:
	core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
	core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
	core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
	integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
	integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
	integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
	integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/97598381
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/97598381
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/97598381

Branch: refs/heads/master
Commit: 975983816666c07b401fdc75b9729c3fabe61e88
Parents: 29f9cf2 89847ea
Author: ravipesala <ra...@gmail.com>
Authored: Thu Aug 4 19:28:37 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 4 19:28:37 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../store/impl/DFSFileHolderImpl.java           |   4 +-
 .../datastorage/store/impl/FileFactory.java     |   6 +-
 .../carbondata/core/reader/ThriftReader.java    |   7 +-
 .../CarbonDictionarySortIndexReaderImpl.java    |  43 +++-
 .../carbondata/core/writer/ThriftWriter.java    |   7 +-
 ...CarbonDictionarySortIndexReaderImplTest.java |  15 +-
 ...CarbonDictionarySortIndexWriterImplTest.java |  15 ++
 .../allqueries/AllDataTypesTestCase4.scala      |   4 +-
 .../allqueries/AllDataTypesTestCase6.scala      |  16 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  28 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   7 +-
 .../execution/command/carbonTableSchema.scala   |  10 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  18 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  21 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 254 +++++++++----------
 .../org/carbondata/spark/rdd/Compactor.scala    |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |  50 +++-
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  59 ++++-
 .../MajorCompactionStopsAfterCompaction.scala   |   2 +-
 .../dataretention/DataRetentionTestCase.scala   |  33 ++-
 .../processing/csvload/DataGraphExecuter.java   |   7 +-
 .../processing/csvload/GraphExecutionUtil.java  |   5 +-
 .../processing/csvreaderstep/CsvInput.java      |  24 +-
 .../csvreaderstep/UnivocityCsvParser.java       |   1 +
 .../sortdata/IntermediateFileMerger.java        |   5 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  28 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  23 +-
 30 files changed, 450 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 22b6021,e0ccd2b..6a460e8
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -1288,37 -1308,35 +1287,37 @@@ class CarbonSqlParser(
    }
  
    protected lazy val showLoads: Parser[LogicalPlan] =
-     SHOW ~> (LOADS|SEGMENTS) ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+     SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
        (LIMIT ~> numericLit).? <~
        opt(";") ^^ {
 -      case schemaName ~ cubeName ~ limit =>
 -        ShowLoadsCommand(schemaName, cubeName.toLowerCase(), limit)
 +      case databaseName ~ tableName ~ limit =>
 +        ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
      }
  
    protected lazy val segmentId: Parser[String] =
 -    ( numericLit ^^ { u => u } |
 -      elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
 -      )
 +    numericLit ^^ { u => u } |
 +      elem("decimal", p => {
 +        p.getClass.getSimpleName.equals("FloatLit") ||
 +        p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
  
    protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
-     DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+     DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
        (ident <~ ".").? ~ ident) <~
        opt(";") ^^ {
 -      case loadids ~ cube => cube match {
 -        case schemaName ~ cubeName => DeleteLoadsById(loadids, schemaName, cubeName.toLowerCase())
 +      case loadids ~ table => table match {
 +        case databaseName ~ tableName =>
 +          DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
        }
      }
  
    protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
-     DELETE ~> (LOADS|SEGMENTS) ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+     DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
        (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
        opt(";") ^^ {
 -      case schema ~ cube ~ condition =>
 +      case schema ~ table ~ condition =>
          condition match {
            case dateField ~ dateValue =>
 -            DeleteLoadsByLoadDate(schema, cube.toLowerCase(), dateField, dateValue)
 +            DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
          }
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d19d1c5,cc08604..11358fc
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -798,16 -1225,13 +798,13 @@@ private[sql] case class AlterTableCompa
      val dataLoadSchema = new CarbonDataLoadSchema(table)
      // Need to fill dimension relation
      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 -    carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName)
 -    carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
 -    carbonLoadModel.setStorePath(relation.cubeMeta.storePath)
 +    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
 +    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
 +    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
  
 -    val partitioner = relation.cubeMeta.partitioner
 +    val partitioner = relation.tableMeta.partitioner
+     val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
  
-     var kettleHomePath = CarbonScalaUtil.getKettleHomePath(sqlContext)
-     if (kettleHomePath == null) {
-       sys.error(s"carbon.kettle.home is not set")
-     }
      var storeLocation = CarbonProperties.getInstance
        .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
          System.getProperty("java.io.tmpdir")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 5a1dedc,0000000..144af04
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,290 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.carbondata.spark.rdd
 +
 +import java.util
 +
 +import scala.collection.JavaConverters._
 +import scala.reflect.ClassTag
 +
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.hadoop.mapreduce.Job
 +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.sql.hive.DistributionUtil
 +
 +import org.carbondata.common.CarbonIterator
 +import org.carbondata.common.logging.LogServiceFactory
 +import org.carbondata.core.cache.dictionary.Dictionary
 +import org.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 +import org.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
 +import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 +import org.carbondata.scan.executor.QueryExecutorFactory
 +import org.carbondata.scan.expression.Expression
 +import org.carbondata.scan.model.QueryModel
 +import org.carbondata.scan.result.BatchResult
 +import org.carbondata.scan.result.iterator.ChunkRowIterator
 +import org.carbondata.spark.RawValue
 +import org.carbondata.spark.load.CarbonLoaderUtil
 +import org.carbondata.spark.util.QueryPlanUtil
 +
 +class CarbonSparkPartition(rddId: Int, val idx: Int,
-   val locations: Array[String],
-   val tableBlockInfos: util.List[TableBlockInfo])
++    val locations: Array[String],
++    val tableBlockInfos: util.List[TableBlockInfo])
 +  extends Partition {
 +
 +  override val index: Int = idx
 +
 +  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
 +  override def hashCode(): Int = {
 +    41 * (41 + rddId) + idx
 +  }
 +}
 +
-  /**
-   * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
-   * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
-   * level filtering in driver side.
-   */
++/**
++ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
++ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
++ * level filtering in driver side.
++ */
 +class CarbonScanRDD[V: ClassTag](
-   sc: SparkContext,
-   queryModel: QueryModel,
-   filterExpression: Expression,
-   keyClass: RawValue[V],
-   @transient conf: Configuration,
-   tableCreationTime: Long,
-   schemaLastUpdatedTime: Long,
-   baseStoreLocation: String)
++    sc: SparkContext,
++    queryModel: QueryModel,
++    filterExpression: Expression,
++    keyClass: RawValue[V],
++    @transient conf: Configuration,
++    tableCreationTime: Long,
++    schemaLastUpdatedTime: Long,
++    baseStoreLocation: String)
 +  extends RDD[V](sc, Nil) with Logging {
 +
 +  val defaultParallelism = sc.defaultParallelism
 +
 +  override def getPartitions: Array[Partition] = {
 +    val statisticRecorder = new QueryStatisticsRecorder(queryModel.getQueryId)
 +    val startTime = System.currentTimeMillis()
 +    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
 +      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
 +
 +    val result = new util.ArrayList[Partition](defaultParallelism)
 +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +    // set filter resolver tree
 +    try {
 +      // before applying filter check whether segments are available in the table.
 +      val splits = carbonInputFormat.getSplits(job)
 +      if (!splits.isEmpty) {
 +        val filterResolver = carbonInputFormat
 +          .getResolvedFilter(job.getConfiguration, filterExpression)
 +        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
 +        queryModel.setFilterExpressionResolverTree(filterResolver)
 +      }
 +    }
 +    catch {
 +      case e: Exception =>
 +        LOGGER.error(e)
 +        sys.error("Exception occurred in query execution :: " + e.getMessage)
 +    }
 +    // get splits
 +    val splits = carbonInputFormat.getSplits(job)
 +    if (!splits.isEmpty) {
 +      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 +
 +      val blockList = carbonInputSplits.map(inputSplit =>
 +        new TableBlockInfo(inputSplit.getPath.toString,
 +          inputSplit.getStart, inputSplit.getSegmentId,
 +          inputSplit.getLocations, inputSplit.getLength
 +        ).asInstanceOf[Distributable]
 +      )
 +      if (blockList.nonEmpty) {
 +        // group blocks to nodes, tasks
 +        val startTime = System.currentTimeMillis
 +        var statistic = new QueryStatistic
 +        val activeNodes = DistributionUtil
 +          .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
 +        val nodeBlockMapping =
 +          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
 +            activeNodes.toList.asJava
 +          )
 +        val timeElapsed: Long = System.currentTimeMillis - startTime
 +        statistic.addStatistics("Total Time taken in block(s) allocation", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statistic = new QueryStatistic
 +        var i = 0
 +        // Create Spark Partition for each task and assign blocks
 +        nodeBlockMapping.asScala.foreach { entry =>
 +          entry._2.asScala.foreach { blocksPerTask => {
 +            val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
 +            if (blocksPerTask.size() != 0) {
 +              result
 +                .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
 +              i += 1
 +            }
 +          }
 +          }
 +        }
 +        val noOfBlocks = blockList.size
 +        val noOfNodes = nodeBlockMapping.size
 +        val noOfTasks = result.size()
 +        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
 +                + s"parallelism: $defaultParallelism , " +
 +                s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
 +        )
 +        statistic.addStatistics("Time taken to identify Block(s) to scan", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statisticRecorder.logStatistics
 +        result.asScala.foreach { r =>
 +          val cp = r.asInstanceOf[CarbonSparkPartition]
 +          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
 +                  + ", No.Of Blocks : " + cp.tableBlockInfos.size()
 +          )
 +        }
 +      } else {
 +        logInfo("No blocks identified to scan")
 +        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +      }
 +    }
 +    else {
 +      logInfo("No valid segments found to scan")
 +      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +    }
 +    result.toArray(new Array[Partition](result.size()))
 +  }
 +
-    override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
-      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-      val iter = new Iterator[V] {
-        var rowIterator: CarbonIterator[Array[Any]] = _
-        var queryStartTime: Long = 0
-        try {
-          val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
-          if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
-            queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
-            // fill table block info
-            queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-            queryStartTime = System.currentTimeMillis
++  override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
++    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
++    val iter = new Iterator[V] {
++      var rowIterator: CarbonIterator[Array[Any]] = _
++      var queryStartTime: Long = 0
++      try {
++        val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
++        if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
++          queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
++          // fill table block info
++          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
++          queryStartTime = System.currentTimeMillis
 +
-            val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-            logInfo("*************************" + carbonPropertiesFilePath)
-            if (null == carbonPropertiesFilePath) {
-              System.setProperty("carbon.properties.filepath",
-                System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-            }
-            // execute query
-            rowIterator = new ChunkRowIterator(
-              QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
-                asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
++          val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
++          logInfo("*************************" + carbonPropertiesFilePath)
++          if (null == carbonPropertiesFilePath) {
++            System.setProperty("carbon.properties.filepath",
++              System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
++          }
++          // execute query
++          rowIterator = new ChunkRowIterator(
++            QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
++              asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
 +
-          }
-        } catch {
-          case e: Exception =>
-            LOGGER.error(e)
-            if (null != e.getMessage) {
-              sys.error("Exception occurred in query execution :: " + e.getMessage)
-            } else {
-              sys.error("Exception occurred in query execution.Please check logs.")
-            }
-        }
++        }
++      } catch {
++        case e: Exception =>
++          LOGGER.error(e)
++          if (null != e.getMessage) {
++            sys.error("Exception occurred in query execution :: " + e.getMessage)
++          } else {
++            sys.error("Exception occurred in query execution.Please check logs.")
++          }
++      }
 +
-        var havePair = false
-        var finished = false
-        var recordCount = 0
++      var havePair = false
++      var finished = false
++      var recordCount = 0
 +
-        override def hasNext: Boolean = {
-          if (!finished && !havePair) {
-            finished = (null == rowIterator) || (!rowIterator.hasNext)
-            havePair = !finished
-          }
-          if (finished) {
-            clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if (null != queryModel.getStatisticsRecorder) {
-              val queryStatistic = new QueryStatistic
-              queryStatistic
-                .addStatistics("Total Time taken to execute the query in executor Side",
-                  System.currentTimeMillis - queryStartTime
-                )
-              queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
-              queryModel.getStatisticsRecorder.logStatistics();
-            }
-          }
-          !finished
-        }
++      override def hasNext: Boolean = {
++        if (!finished && !havePair) {
++          finished = (null == rowIterator) || (!rowIterator.hasNext)
++          havePair = !finished
++        }
++        if (finished) {
++          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++          if (null != queryModel.getStatisticsRecorder) {
++            val queryStatistic = new QueryStatistic
++            queryStatistic
++              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++                System.currentTimeMillis - queryStartTime
++              )
++            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++            queryModel.getStatisticsRecorder.logStatistics();
++          }
++        }
++        !finished
++      }
 +
-        override def next(): V = {
-          if (!hasNext) {
-            throw new java.util.NoSuchElementException("End of stream")
-          }
-          havePair = false
-          recordCount += 1
-          if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
-            clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if (null != queryModel.getStatisticsRecorder) {
-              val queryStatistic = new QueryStatistic
-              queryStatistic
-                .addStatistics("Total Time taken to execute the query in executor Side",
-                  System.currentTimeMillis - queryStartTime
-                )
-              queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
-              queryModel.getStatisticsRecorder.logStatistics();
-            }
-          }
-          keyClass.getValue(rowIterator.next())
-        }
-        def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
-          if (null != columnToDictionaryMap) {
-            org.carbondata.spark.util.CarbonQueryUtil
-              .clearColumnDictionaryCache(columnToDictionaryMap)
-          }
-        }
-      }
-      iter
-    }
++      override def next(): V = {
++        if (!hasNext) {
++          throw new java.util.NoSuchElementException("End of stream")
++        }
++        havePair = false
++        recordCount += 1
++        if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
++          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++          if (null != queryModel.getStatisticsRecorder) {
++            val queryStatistic = new QueryStatistic
++            queryStatistic
++              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++                System.currentTimeMillis - queryStartTime
++              )
++            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++            queryModel.getStatisticsRecorder.logStatistics();
++          }
++        }
++        keyClass.getValue(rowIterator.next())
++      }
++      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
++        if (null != columnToDictionaryMap) {
++          org.carbondata.spark.util.CarbonQueryUtil
++            .clearColumnDictionaryCache(columnToDictionaryMap)
++        }
++      }
++    }
++    iter
++  }
 +
-    /**
-     * Get the preferred locations where to launch this task.
-     */
-    override def getPreferredLocations(partition: Partition): Seq[String] = {
-      val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-      val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
-      val tableBlocks = theSplit.tableBlockInfos
-      // node name and count mapping
-      val blockMap = new util.LinkedHashMap[String, Integer]()
++  /**
++   * Get the preferred locations where to launch this task.
++   */
++  override def getPreferredLocations(partition: Partition): Seq[String] = {
++    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
++    val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
++    val tableBlocks = theSplit.tableBlockInfos
++    // node name and count mapping
++    val blockMap = new util.LinkedHashMap[String, Integer]()
 +
-      tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
-        location => {
-          if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-            val currentCount = blockMap.get(location)
-            if (currentCount == null) {
-              blockMap.put(location, 1)
-            } else {
-              blockMap.put(location, currentCount + 1)
-            }
-          }
-        }
-      )
-      )
++    tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
++      location => {
++        if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
++          val currentCount = blockMap.get(location)
++          if (currentCount == null) {
++            blockMap.put(location, 1)
++          } else {
++            blockMap.put(location, currentCount + 1)
++          }
++        }
++      }
++    )
++    )
 +
-      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-        nodeCount1.getValue > nodeCount2.getValue
-      }
-      )
++    val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
++      nodeCount1.getValue > nodeCount2.getValue
++    }
++    )
 +
-      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-      firstOptionLocation ++ sortedNodesList
-    }
++    val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
++    firstOptionLocation ++ sortedNodesList
++  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 5406e77,87dd0ce..8b91745
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@@ -17,9 -17,13 +17,12 @@@
  
  package org.carbondata.spark.util
  
+ import java.io.File
+ 
  import scala.collection.JavaConverters._
  
+ import org.apache.spark.Logging
  import org.apache.spark.sql._
 -import org.apache.spark.sql.execution.command.Level
  import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
  import org.apache.spark.sql.types._
  
@@@ -27,9 -31,11 +30,10 @@@ import org.carbondata.core.carbon.metad
  import org.carbondata.core.carbon.metadata.encoder.Encoding
  import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
  import org.carbondata.core.constants.CarbonCommonConstants
+ import org.carbondata.core.datastorage.store.impl.FileFactory
 -import org.carbondata.core.util.CarbonProperties
 -import org.carbondata.query.expression.{DataType => CarbonDataType}
 +import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
  
- object CarbonScalaUtil {
+ object CarbonScalaUtil extends Logging {
    def convertSparkToCarbonDataType(
        dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
      dataType match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------