You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:29 UTC

[04/50] [abbrv] carbondata git commit: [CARBONDATA-2489] Coverity scan fixes

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 7ddd181..ffe1aef 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -49,7 +49,7 @@ public class IntegerStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(isDictionary) {
+        if (isDictionary) {
           populateDictionaryVector(type, numberOfRows, builder);
         } else {
           if (columnVector.anyNullsSet()) {
@@ -62,10 +62,8 @@ public class IntegerStreamReader extends AbstractStreamReader {
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeLong(builder, ((Integer) streamData[i]).longValue());
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeLong(builder, ((Integer) streamData[i]).longValue());
       }
     }
     return builder.build();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 015ac80..e1000c5 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -49,7 +49,7 @@ public class LongStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(isDictionary) {
+        if (isDictionary) {
           populateDictionaryVector(type, numberOfRows, builder);
         }
         if (columnVector.anyNullsSet()) {
@@ -61,10 +61,8 @@ public class LongStreamReader extends AbstractStreamReader {
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeLong(builder, (Long) streamData[i]);
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeLong(builder, (Long) streamData[i]);
       }
     }
     return builder.build();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index 82d62ad..8952712 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -41,32 +41,26 @@ public class ObjectStreamReader  extends AbstractStreamReader {
    * @return
    * @throws IOException
    */
-  public Block readBlock(Type type)
-      throws IOException
-  {
+  public Block readBlock(Type type) throws IOException {
     int numberOfRows = 0;
     BlockBuilder builder = null;
-    if(isVectorReader) {
+    if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        for(int i = 0; i < numberOfRows ; i++ ){
+        for (int i = 0; i < numberOfRows; i++) {
           type.writeObject(builder, columnVector.getData(i));
         }
       }
-
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for(int i = 0; i < numberOfRows ; i++ ){
-          type.writeObject(builder, streamData[i]);
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeObject(builder, streamData[i]);
       }
     }
 
     return builder.build();
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 00e5485..51f1cd5 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -49,7 +49,7 @@ public class ShortStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(isDictionary) {
+        if (isDictionary) {
           populateDictionaryVector(type, numberOfRows, builder);
         } else {
           if (columnVector.anyNullsSet()) {
@@ -59,13 +59,11 @@ public class ShortStreamReader extends AbstractStreamReader {
           }
         }
       }
-   } else {
+    } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeLong(builder, (Short) streamData[i]);
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeLong(builder, (Short) streamData[i]);
       }
     }
     return builder.build();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index d98afa3..cce35e0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -72,7 +72,7 @@ public class SliceStreamReader extends AbstractStreamReader {
           }
           return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
         } else {
-          if(columnVector.anyNullsSet()) {
+          if (columnVector.anyNullsSet()) {
             handleNullInVector(type, numberOfRows, builder);
           } else {
             populateVector(type, numberOfRows, builder);
@@ -82,10 +82,8 @@ public class SliceStreamReader extends AbstractStreamReader {
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeSlice(builder, utf8Slice(streamData[i].toString()));
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeSlice(builder, utf8Slice(streamData[i].toString()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 01b7939..a22ef29 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -39,10 +39,9 @@ public class TimestampStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
+        if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
+        } else {
           populateVector(type, numberOfRows, builder);
         }
       }
@@ -50,10 +49,8 @@ public class TimestampStreamReader extends AbstractStreamReader {
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      if (streamData != null) {
-        for (int i = 0; i < numberOfRows; i++) {
-          type.writeLong(builder, (Long) streamData[i]);
-        }
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeLong(builder, (Long) streamData[i]);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index d45e759..49aa7ff 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.sql.common.util
 
-import java.io.{FileInputStream, ObjectInputStream, ObjectOutputStream}
-import java.math
-import java.math.RoundingMode
+import java.io.{ObjectInputStream, ObjectOutputStream}
 import java.util.{Locale, TimeZone}
 
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
index 1e98ec2..f4948c4 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
@@ -38,7 +38,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.netty.SparkTransportConf;
 import org.apache.spark.network.sasl.SaslServerBootstrap;
-import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.network.server.TransportServerBootstrap;
 import org.apache.spark.network.util.TransportConf;
 import scala.Some;
@@ -144,8 +143,7 @@ public class SecureDictionaryServer extends AbstractDictionaryServer implements
         TransportServerBootstrap bootstrap =
             new SaslServerBootstrap(transportConf, securityManager);
         String host = findLocalIpAddress(LOGGER);
-        TransportServer transportServer = context
-            .createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap));
+        context.createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap));
         LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
             + " Listening on port " + newPort);
         this.port = newPort;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
deleted file mode 100644
index 9936a2a..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonCleanFilesRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    databaseName: String,
-    tableName: String,
-    partitioner: Partitioner)
-  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[(V)] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      // TODO call CARBON delete API
-
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
deleted file mode 100644
index b11dfad..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.DeletedLoadResult
-
-class CarbonDeleteLoadByDateRDD[K, V](
-    sc: SparkContext,
-    result: DeletedLoadResult[K, V],
-    databaseName: String,
-    tableName: String,
-    dateField: String,
-    dateFieldActualName: String,
-    dateValue: String,
-    factTableName: String,
-    dimTableName: String,
-    storePath: String,
-    loadMetadataDetails: List[LoadMetadataDetails])
-  extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map {s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
-      // TODO call CARBON delete API
-      logInfo("Applying data retention as per date value " + dateValue)
-      var dateFormat = ""
-      try {
-        dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-      } catch {
-        case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
-      }
-      // TODO: Implement it
-      val finished = false
-
-      override def hasNext: Boolean = {
-        finished
-      }
-
-      override def next(): (K, V) = {
-        result.getKey(null, null)
-      }
-    }
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
deleted file mode 100644
index 759ed42..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonDeleteLoadRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    loadId: Int,
-    databaseName: String,
-    tableName: String,
-    partitioner: Partitioner)
-  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map {f =>
-      new CarbonLoadPartition(id, f._2, f._1)
-    }
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[V] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      // TODO call CARBON delete API
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-
-    }
-    logInfo("********Deleting***************")
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
deleted file mode 100644
index f327d88..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonDropTableRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    databaseName: String,
-    tableName: String)
-  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map { s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
-      val iter = new Iterator[V] {
-      // TODO: Clear Btree from memory
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-    }
-    iter
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index c2c4ab3..5c3ace3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -153,6 +153,15 @@ object QueryTest {
         Row.fromSeq(s.toSeq.map {
           case d: java.math.BigDecimal => BigDecimal(d)
           case b: Array[Byte] => b.toSeq
+          case d : Double =>
+            if (!d.isInfinite && !d.isNaN) {
+              var bd = BigDecimal(d)
+              bd = bd.setScale(5, BigDecimal.RoundingMode.UP)
+              bd.doubleValue()
+            }
+            else {
+              d
+            }
           case o => o
         })
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 903bf44..082ef8b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.InputMetricsStats;
 import org.apache.carbondata.spark.util.CarbonScalaUtil;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.spark.memory.MemoryMode;
@@ -134,24 +135,17 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
       queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
       iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
     } catch (QueryExecutionException e) {
-      Throwable ext = e;
-      while (ext != null) {
-        if (ext instanceof FileNotFoundException) {
-          throw new InterruptedException(
-              "Insert overwrite may be in progress.Please check " + e.getMessage());
-        }
-        ext = ext.getCause();
+      if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+        LOGGER.error(e);
+        throw new InterruptedException(
+            "Insert overwrite may be in progress.Please check " + e.getMessage());
       }
       throw new InterruptedException(e.getMessage());
     } catch (Exception e) {
-      Throwable ext = e;
-      while (ext != null) {
-        if (ext instanceof FileNotFoundException) {
-          LOGGER.error(e);
-          throw new InterruptedException(
-              "Insert overwrite may be in progress.Please check " + e.getMessage());
-        }
-        ext = ext.getCause();
+      if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+        LOGGER.error(e);
+        throw new InterruptedException(
+            "Insert overwrite may be in progress.Please check " + e.getMessage());
       }
       throw e;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
index d668329..58ec0d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -24,8 +24,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -47,7 +47,7 @@ public class BadRecordsLogger {
    * the status
    */
   private static Map<String, String> badRecordEntry =
-      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      new ConcurrentHashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   /**
    * File Name
    */
@@ -121,6 +121,9 @@ public class BadRecordsLogger {
 
   public void addBadRecordsToBuilder(Object[] row, String reason)
       throws CarbonDataLoadingException {
+    // setting partial success entry since even if bad records are there then load
+    // status should be partial success regardless of bad record logged
+    badRecordEntry.put(taskKey, "Partially");
     if (badRecordsLogRedirect || badRecordLoggerEnable) {
       StringBuilder logStrings = new StringBuilder();
       int size = row.length;
@@ -158,10 +161,6 @@ public class BadRecordsLogger {
         }
         writeBadRecordsToFile(logStrings);
       }
-    } else {
-      // setting partial success entry since even if bad records are there then load
-      // status should be partial success regardless of bad record logged
-      badRecordEntry.put(taskKey, "Partially");
     }
   }
 
@@ -200,11 +199,6 @@ public class BadRecordsLogger {
     } catch (IOException e) {
       LOGGER.error("Error While writing bad record log File");
       throw new CarbonDataLoadingException("Error While writing bad record log File", e);
-    } finally {
-      // if the Bad record file is created means it partially success
-      // if any entry present with key that means its have bad record for
-      // that key
-      badRecordEntry.put(taskKey, "Partially");
     }
   }
 
@@ -246,9 +240,6 @@ public class BadRecordsLogger {
       LOGGER.error("Error While writing bad record csv File");
       throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
     }
-    finally {
-      badRecordEntry.put(taskKey, "Partially");
-    }
   }
 
   public boolean isBadRecordConvertNullDisable() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
index ed35a96..12f6927 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
@@ -39,17 +39,18 @@ public class ThreadStatusObserver {
     // should assign the throwable object else the actual cause for failure can be overridden as
     // all the running threads will throw interrupted exception on calling shutdownNow and
     // will override the throwable object
-    if (null == this.throwable) {
-      synchronized (lock) {
-        if (null == this.throwable) {
-          executorService.shutdownNow();
-          this.throwable = throwable;
-        }
+    synchronized (lock) {
+      if (null == this.throwable) {
+        executorService.shutdownNow();
+        this.throwable = throwable;
       }
     }
   }
 
   public Throwable getThrowable() {
-    return throwable;
+
+    synchronized (lock) {
+      return throwable;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 80887c1..cb72f54 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -164,7 +164,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         LOGGER.error(e);
         this.threadStatusObserver.notifyFailed(e);
       } finally {
-        sortDataRows.finishThread();
+        synchronized (sortDataRows) {
+          sortDataRows.finishThread();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 1c6ce8d..fb0bcc3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -109,7 +109,6 @@ public class UnsafeSortDataRows {
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
     this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-    this.inMemoryChunkSize = inMemoryChunkSize;
     this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
     enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
@@ -121,7 +120,7 @@ public class UnsafeSortDataRows {
       // in sort memory size.
       this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
     } else {
-      this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
+      this.maxSizeAllowed = this.maxSizeAllowed * 1024L * 1024L;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
index 01e7649..a65de16 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -215,7 +216,11 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromMemory();
+    if (hasNext()) {
+      return getSortedRecordFromMemory();
+    } else {
+      throw new NoSuchElementException("No more elements to return");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 22673ff..c5b215e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
@@ -238,7 +239,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromFile();
+    if (hasNext()) {
+      return getSortedRecordFromFile();
+    } else {
+      throw new NoSuchElementException("No more elements to return");
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 0c692c7..8d2c52a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -38,6 +38,8 @@ import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 
+import org.apache.commons.collections.list.SynchronizedList;
+
 /**
  * It does mergesort intermediate files to big file.
  */
@@ -76,7 +78,8 @@ public class UnsafeIntermediateMerger {
     this.mergedPages = new ArrayList<>();
     this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
         new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
-    this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    this.procFiles =
+        SynchronizedList.decorate(new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
     this.mergerTask = new ArrayList<>();
 
     Integer spillPercentage;
@@ -111,15 +114,18 @@ public class UnsafeIntermediateMerger {
   }
 
   public void startFileMergingIfPossible() {
-    File[] fileList;
-    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      synchronized (lockObject) {
+    File[] fileList = null;
+    synchronized (lockObject) {
+      if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
         fileList = procFiles.toArray(new File[procFiles.size()]);
         this.procFiles = new ArrayList<File>();
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER
+              .debug("Submitting request for intermediate merging no of files: " + fileList.length);
+        }
       }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
-      }
+    }
+    if (null != fileList) {
       startIntermediateMerging(fileList);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 073d13b..6defeb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -195,8 +196,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    * @return sorted row
    */
   public Object[] next() {
-    IntermediateSortTempRow sortTempRow =  getSortedRecordFromFile();
-    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    if (hasNext()) {
+      IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+      return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    } else {
+      throw new NoSuchElementException("No more elements to return");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 4078a13..d0e78fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -96,6 +96,10 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     // to be launched.
     int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
 
+    if (parallelThreadNumber <= 0) {
+      parallelThreadNumber = 1;
+    }
+
     List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
     for (int i = 0; i < parallelThreadNumber; i++) {
       iterators[i] = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 1744675..81031de 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1108,23 +1108,24 @@ public final class CarbonDataMergerUtil {
 
       CarbonFile[] deleteDeltaFiles =
           segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+      if (null != deleteDeltaFiles) {
+        // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
+        // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
+        // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
+        // Spill Over Blocks will choose files with unique taskID.
+        for (CarbonFile blocks : deleteDeltaFiles) {
+          // Get Task ID and the Timestamp from the Block name for e.g.
+          // part-0-3-1481084721319.carbondata => "3-1481084721319"
+          String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+          String timestamp =
+              CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+          String taskAndTimeStamp = task + "-" + timestamp;
+          uniqueBlocks.add(taskAndTimeStamp);
+        }
 
-      // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
-      // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
-      // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
-      // Spill Over Blocks will choose files with unique taskID.
-      for (CarbonFile blocks : deleteDeltaFiles) {
-        // Get Task ID and the Timestamp from the Block name for e.g.
-        // part-0-3-1481084721319.carbondata => "3-1481084721319"
-        String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
-        String timestamp =
-            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
-        String taskAndTimeStamp = task + "-" + timestamp;
-        uniqueBlocks.add(taskAndTimeStamp);
-      }
-
-      if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
-        return true;
+        if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+          return true;
+        }
       }
     }
     return false;
@@ -1152,7 +1153,7 @@ public final class CarbonDataMergerUtil {
       CarbonFile[] deleteDeltaFiles =
           segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
 
-      if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
+      if (null != deleteDeltaFiles && (deleteDeltaFiles.length > numberDeltaFilesThreshold)) {
         blockLists.add(seg.getSegmentNo() + "/" + blockName);
       }
     }
@@ -1200,31 +1201,34 @@ public final class CarbonDataMergerUtil {
 
     String destFileName =
         blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
-    String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
-        + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
-
-    List<String> deleteFilePathList = new ArrayList<String>();
-    for (CarbonFile cFile : deleteDeltaFiles) {
-      deleteFilePathList.add(cFile.getCanonicalPath());
-    }
+    List<String> deleteFilePathList = new ArrayList<>();
+    if (null != deleteDeltaFiles && deleteDeltaFiles.length > 0 && null != deleteDeltaFiles[0]
+        .getParentFile()) {
+      String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
+          + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
+
+      for (CarbonFile cFile : deleteDeltaFiles) {
+        deleteFilePathList.add(cFile.getCanonicalPath());
+      }
 
-    CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
-    blockDetails.setBlockName(blockName);
-    blockDetails.setSegmentName(seg);
-    blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
-    blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
+      CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
+      blockDetails.setBlockName(blockName);
+      blockDetails.setSegmentName(seg);
+      blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
+      blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
 
-    try {
-      if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
-        blockDetails.setCompactionStatus(true);
-      } else {
-        blockDetails.setCompactionStatus(false);
+      try {
+        if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
+          blockDetails.setCompactionStatus(true);
+        } else {
+          blockDetails.setCompactionStatus(false);
+        }
+        resultList.add(blockDetails);
+      } catch (IOException e) {
+        LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
+            + fullBlockFilePath);
+        throw new IOException();
       }
-      resultList.add(blockDetails);
-    } catch (IOException e) {
-      LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
-          + fullBlockFilePath);
-      throw new IOException();
     }
     return resultList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 442f1c5..9a3258e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -138,21 +138,23 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
       // if record holder is not empty then iterator the slice holder from
       // heap
       iterator = this.recordHolderHeap.poll();
-      while (true) {
-        Object[] convertedRow = iterator.next();
-        if (null == convertedRow) {
-          iterator.close();
-          break;
-        }
-        // do it only once
-        if (!isDataPresent) {
-          dataHandler.initialise();
-          isDataPresent = true;
-        }
-        addRow(convertedRow);
-        // check if leaf contains no record
-        if (!iterator.hasNext()) {
-          break;
+      if (null != iterator) {
+        while (true) {
+          Object[] convertedRow = iterator.next();
+          if (null == convertedRow) {
+            iterator.close();
+            break;
+          }
+          // do it only once
+          if (!isDataPresent) {
+            dataHandler.initialise();
+            isDataPresent = true;
+          }
+          addRow(convertedRow);
+          // check if leaf contains no record
+          if (!iterator.hasNext()) {
+            break;
+          }
         }
       }
       if (isDataPresent)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
deleted file mode 100644
index b6f12a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.partition.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.partition.DataPartitioner;
-import org.apache.carbondata.processing.partition.Partition;
-
-
-public final class QueryPartitionHelper {
-  private static QueryPartitionHelper instance = new QueryPartitionHelper();
-  private Map<String, DataPartitioner> partitionerMap =
-      new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  private Map<String, DefaultLoadBalancer> loadBalancerMap =
-      new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private QueryPartitionHelper() {
-
-  }
-
-  public static QueryPartitionHelper getInstance() {
-    return instance;
-  }
-
-  /**
-   * Get partitions applicable for query based on filters applied in query
-   */
-  public List<Partition> getPartitionsForQuery(String databaseName, String tableName) {
-    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    return dataPartitioner.getPartitions();
-  }
-
-  public List<Partition> getAllPartitions(String databaseName, String tableName) {
-    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    return dataPartitioner.getAllPartitions();
-  }
-
-  /**
-   * Get the node name where the partition is assigned to.
-   */
-  public String getLocation(Partition partition, String databaseName, String tableName) {
-    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
-    DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
-    return loadBalancer.getNodeForPartitions(partition);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index c06819c..364515c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
@@ -237,7 +238,11 @@ public class IntermediateFileMerger implements Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromFile();
+    if (hasNext()) {
+      return getSortedRecordFromFile();
+    } else {
+      throw new NoSuchElementException("No more elements to return");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 1a839a2..09c1920 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -23,6 +23,7 @@ import java.util.AbstractQueue;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -233,8 +234,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * @throws CarbonSortKeyAndGroupByException
    */
   public Object[] next() {
-    IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
-    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    if (hasNext()) {
+      IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+      return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+    } else {
+      throw new NoSuchElementException("No more elements to return");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 8115f97..9d0c933 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -43,7 +43,6 @@ import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -56,8 +55,6 @@ import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 
-import org.apache.commons.lang3.ArrayUtils;
-
 public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
   private static final LogService LOGGER =
@@ -168,13 +165,13 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
             CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
     this.blockSizeThreshold =
         fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
-    LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
-        blockSizeThreshold);
+    LOGGER
+        .info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + blockSizeThreshold);
 
     // whether to directly write fact data to HDFS
-    String directlyWriteData2Hdfs = propInstance.getProperty(
-        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
+    String directlyWriteData2Hdfs = propInstance
+        .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
     this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
 
     if (enableDirectlyWriteData2Hdfs) {
@@ -189,22 +186,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     // in case of compaction we will pass the cardinality.
     this.localCardinality = this.model.getColCardinality();
 
-    //TODO: We should delete the levelmetadata file after reading here.
-    // so only data loading flow will need to read from cardinality file.
-    if (null == this.localCardinality) {
-      this.localCardinality = CarbonMergerUtil
-          .getCardinalityFromLevelMetadata(this.model.getStoreLocation(),
-              this.model.getTableName());
-      List<Integer> cardinalityList = new ArrayList<Integer>();
-      thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
-          this.model.getWrapperColumnSchema());
-      localCardinality =
-          ArrayUtils.toPrimitive(cardinalityList.toArray(new Integer[cardinalityList.size()]));
-    } else { // for compaction case
-      List<Integer> cardinalityList = new ArrayList<Integer>();
-      thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
-          this.model.getWrapperColumnSchema());
-    }
+    List<Integer> cardinalityList = new ArrayList<Integer>();
+    thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+        this.model.getWrapperColumnSchema());
     this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
index 4abdf3c..8e23489 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -17,19 +17,8 @@
 
 package org.apache.carbondata.processing.util;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.QueryProjection;
-import org.apache.carbondata.processing.partition.Partition;
-import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
-import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.processing.partition.impl.QueryPartitionHelper;
-import org.apache.carbondata.processing.splits.TableSplit;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -43,58 +32,6 @@ public class CarbonQueryUtil {
   }
 
   /**
-   * It creates the one split for each region server.
-   */
-  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      QueryProjection queryPlan) {
-
-    //Just create splits depends on locations of region servers
-    List<Partition> allPartitions = null;
-    if (queryPlan == null) {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
-    } else {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName);
-    }
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = QueryPartitionHelper.getInstance()
-          .getLocation(partition, databaseName, tableName);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-
-    return splits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
-
-    //Just create splits depends on locations of region servers
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
-    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
-    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < tblSplits.length; i++) {
-      tblSplits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      tblSplits[i].setPartition(partition);
-      tblSplits[i].setLocations(locations);
-    }
-    return tblSplits;
-  }
-
-  /**
    * split sourcePath by comma
    */
   public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
@@ -104,21 +41,4 @@ public class CarbonQueryUtil {
       Collections.addAll(partitionsFiles, files);
     }
   }
-
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
-    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(0).add(files.get(i));
-    }
-    return partitionList;
-  }
-
 }