You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/03/07 06:44:34 UTC

[1/2] incubator-carbondata git commit: Compaction Partitioning changes

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 50ec532a2 -> 8e8bc5de2


Compaction Partitioning changes

Compaction Cardinality Changes

Test Case Update


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/49ad2bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/49ad2bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/49ad2bb0

Branch: refs/heads/master
Commit: 49ad2bb0d8400b2af5a76a2518f0da8e6bf2047a
Parents: 50ec532
Author: sounakr <so...@gmail.com>
Authored: Fri Feb 17 20:12:39 2017 +0530
Committer: sounakr <so...@gmail.com>
Committed: Mon Mar 6 13:02:05 2017 +0530

----------------------------------------------------------------------
 .../hadoop/util/CarbonInputSplitTaskInfo.java   | 129 ++++++++++++
 .../resources/compaction/compactionIUD1.csv     |   6 +
 .../resources/compaction/compactionIUD2.csv     |   6 +
 .../resources/compaction/compactionIUD3.csv     |   6 +
 .../resources/compaction/compactionIUD4.csv     |   6 +
 .../DataCompactionBlockletBoundryTest.scala     |   2 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   1 +
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 197 +++++++++++--------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  16 +-
 .../datacompaction/DataCompactionTest.scala     |  62 +++++-
 .../testsuite/iud/IUDCompactionTestCases.scala  |  54 ++---
 12 files changed, 376 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
new file mode 100644
index 0000000..3a53765
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+public class CarbonInputSplitTaskInfo implements Distributable {
+
+  private final List<CarbonInputSplit> carbonBlockInfoList;
+
+  private final String taskId;
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public List<CarbonInputSplit> getCarbonInputSplitList() {
+    return carbonBlockInfoList;
+  }
+
+  public CarbonInputSplitTaskInfo(String taskId, List<CarbonInputSplit> carbonSplitListInfo) {
+    this.taskId = taskId;
+    this.carbonBlockInfoList = carbonSplitListInfo;
+  }
+
+  @Override public String[] getLocations() {
+    Set<String> locations = new HashSet<String>();
+    for (CarbonInputSplit splitInfo : carbonBlockInfoList) {
+      try {
+        locations.addAll(Arrays.asList(splitInfo.getLocations()));
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to get location of split: " + splitInfo, e);
+      }
+    }
+    locations.toArray(new String[locations.size()]);
+    List<String> nodes = CarbonInputSplitTaskInfo.maxNoNodes(carbonBlockInfoList);
+    return nodes.toArray(new String[nodes.size()]);
+  }
+
+  @Override public int compareTo(Distributable o) {
+    return taskId.compareTo(((CarbonInputSplitTaskInfo) o).getTaskId());
+  }
+
+  /**
+   * Finding which node has the maximum number of blocks for it.
+   *
+   * @param blockList
+   * @return
+   */
+  public static List<String> maxNoNodes(List<CarbonInputSplit> splitList) {
+    boolean useIndex = true;
+    Integer maxOccurence = 0;
+    String maxNode = null;
+    Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
+
+    // populate the map of node and number of occurences of that node.
+    for (CarbonInputSplit split : splitList) {
+      try {
+        for (String node : split.getLocations()) {
+          Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
+          if (null == nodeOccurence) {
+            nodeAndOccurenceMapping.put(node, 1);
+          } else {
+            nodeOccurence++;
+          }
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to get location of split: " + split, e);
+      }
+    }
+    Integer previousValueOccurence = null;
+
+    // check which node is occured maximum times.
+    for (Map.Entry<String, Integer> entry : nodeAndOccurenceMapping.entrySet()) {
+      // finding the maximum node.
+      if (entry.getValue() > maxOccurence) {
+        maxOccurence = entry.getValue();
+        maxNode = entry.getKey();
+      }
+      // first time scenario. initialzing the previous value.
+      if (null == previousValueOccurence) {
+        previousValueOccurence = entry.getValue();
+      } else {
+        // for the case where all the nodes have same number of blocks then
+        // we need to return complete list instead of max node.
+        if (!Objects.equals(previousValueOccurence, entry.getValue())) {
+          useIndex = false;
+        }
+      }
+    }
+
+    // if all the nodes have equal occurence then returning the complete key set.
+    if (useIndex) {
+      return new ArrayList<>(nodeAndOccurenceMapping.keySet());
+    }
+
+    // if any max node is found then returning the max node.
+    List<String> node = new ArrayList<>(1);
+    node.add(maxNode);
+    return node;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
new file mode 100644
index 0000000..f9c9e29
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date,phonetype,serialname,ID,salary
+FirstOne,LastOne,07/24/2015, phone197,ASD69643,1,15000
+FirstSecond,LastSecond,07/24/2015,phone756,ASD42892,2,15001
+FirstThird,LastThird,07/25/2015,phone1904,ASD37014,3,15002
+FirstFour,LastFour,07/26/2015,phone2435,ASD66902,4,15003
+FirstFive,LastFive,07/27/2015,phone2441,ASD90633,5,15004

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
new file mode 100644
index 0000000..652f182
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstSix,LastSix,07/24/2015, phone197,ASD69643,6, 15000
+FirstSeven, LastSeven, 07/24/2015,phone756,ASD42892,7,15001
+FirstEight, LastEight, 07/25/2015,phone1904,ASD37014,8,15002
+FirstNine, LastNine, 07/26/2015,phone2435,ASD66902,9,15003
+FirstTen, LastTen, 07/27/2015,phone2441,ASD90633,10,15004

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
new file mode 100644
index 0000000..9986a9c
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstEleven,LastEleven,07/24/2015, phone197,ASD69643,11, 15000
+FirstTwelve, LastTwelve, 07/24/2015,phone756,ASD42892,12,15001
+FirstThirteen, LastThirteen, 07/25/2015,phone1904,ASD37014,13,15002
+FirstFourteen, LastFourteen, 07/26/2015,phone2435,ASD66902,14,15003
+FirstFifteen, LastFifteen, 07/27/2015,phone2441,ASD90633,15,15004

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
new file mode 100644
index 0000000..baf8596
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstSixteen,LastSixteen,07/24/2015, phone197,ASD69643,16, 15000
+FirstSeventeen, LastSeventeen, 07/24/2015,phone756,ASD42892,17,15001
+FirstEighteen, LastEighteen, 07/25/2015,phone1904,ASD37014,18,15002
+FirstNineteen, LastNineteen, 07/26/2015,phone2435,ASD66902,19,15003
+FirstTwenty, LastTwenty, 07/27/2015,phone2441,ASD90633,20,15004

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
index 05895c4..a5078e7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
@@ -34,7 +34,7 @@ class DataCompactionBlockletBoundryTest extends QueryTest with BeforeAndAfterAll
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
-        "120")
+        "125")
     sql(
       "CREATE TABLE IF NOT EXISTS blocklettest (country String, ID String, date Timestamp, name " +
         "String, " +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index 2a5ae92..311c53f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -112,6 +112,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
     )
   }
 
+
   override def afterAll {
     sql("drop table if exists  cardinalityTest")
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index d4918ce..f04669c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -81,14 +81,13 @@ class CarbonIUDMergerRDD[K, V](
     // group blocks by segment.
     val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
 
-    var i = 0
+    var i = -1
 
     // No need to get a new SegmentUpdateStatus Manager as the Object is passed
     // in CarbonLoadModel.
     // val manager = new SegmentUpdateStatusManager(absoluteTableIdentifier)
     val updateStatusManager = carbonLoadModel.getSegmentUpdateStatusManager
 
-
     // make one spark partition for one segment
     val resultSplits = splitsGroupedMySegment.map(entry => {
       val (segName, splits) = (entry._1, entry._2)
@@ -100,6 +99,7 @@ class CarbonIUDMergerRDD[K, V](
 
       if (!validSplits.isEmpty) {
         val locations = validSplits(0).getLocations
+        i += 1
         new CarbonSparkPartition(id, i,
           new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations))
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 73f3bcd..7a506ba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.rdd
 
 import java.io.IOException
+import java.util
 import java.util.{Collections, List}
 
 import scala.collection.JavaConverters._
@@ -37,18 +38,19 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block._
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, CompactionType, RowResultMerger}
+import org.apache.carbondata.spark.merger._
 import org.apache.carbondata.spark.splits.TableSplit
 
 class CarbonMergerRDD[K, V](
@@ -235,9 +237,19 @@ class CarbonMergerRDD[K, V](
     iter
   }
 
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonSparkPartition]
-    theSplit.split.value.getLocations.filter(_ != "localhost")
+
+  def calculateCardanility(targetCardinality: Array[Int],
+      sourceCardinality: Array[Int],
+      columnSize: Int): Unit = {
+    var cols = columnSize
+
+    // Choose the highest cardinality among all the blocks.
+    while (cols > 0) {
+      if (targetCardinality(cols - 1) < sourceCardinality(cols - 1)) {
+        targetCardinality(cols - 1) = sourceCardinality(cols - 1)
+      }
+      cols -= 1
+    }
   }
 
   override def getPartitions: Array[Partition] = {
@@ -245,111 +257,128 @@ class CarbonMergerRDD[K, V](
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
       hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
-    val updateStatusManger: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
+    val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       absoluteTableIdentifier)
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
+    var partitionNo = 0
+    var columnSize = 0
+    var noOfBlocks = 0
 
     // mapping of the node and block list.
     var nodeBlockMapping: java.util.Map[String, java.util.List[Distributable]] = new
-            java.util.HashMap[String, java.util.List[Distributable]]
+        java.util.HashMap[String, java.util.List[Distributable]]
 
-    var noOfBlocks = 0
     val taskInfoList = new java.util.ArrayList[Distributable]
     var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
 
-    var blocksOfLastSegment: List[TableBlockInfo] = null
+    var splitsOfLastSegment: List[CarbonInputSplit] = null
+    // map for keeping the relation of a task and its blocks.
+    val taskIdMapping: java.util.Map[String, java.util.List[CarbonInputSplit]] = new
+        java.util.HashMap[String, java.util.List[CarbonInputSplit]]
 
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
-      // map for keeping the relation of a task and its blocks.
-      val taskIdMapping: java.util.Map[String, java.util.List[TableBlockInfo]] = new
-            java.util.HashMap[String, java.util.List[TableBlockInfo]]
 
       // map for keeping the relation of a task and its blocks.
       job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
 
+      val updateDetails: UpdateVO = updateStatusManager.getInvalidTimestampRange(eachSeg)
+
       // get splits
       val splits = format.getSplits(job)
-      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
-      val updateDetails: UpdateVO = updateStatusManger.getInvalidTimestampRange(eachSeg)
-
-      // take the blocks of one segment.
-      val blocksOfOneSegment = carbonInputSplits.map(inputSplit =>
-        new TableBlockInfo(inputSplit.getPath.toString,
-          inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion
-        )
-      )
-        .filter(blockInfo => !CarbonUtil
-          .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManger))
 
       // keep on assigning till last one is reached.
-      if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
-        blocksOfLastSegment = blocksOfOneSegment.asJava
+      if (null != splits && splits.size > 0) {
+        splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
       }
 
-      // populate the task and its block mapping.
-      blocksOfOneSegment.foreach(f = tableBlockInfo => {
-        val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
-        val blockList = taskIdMapping.get(taskNo)
-        if (null == blockList) {
-          val blockListTemp = new java.util.ArrayList[TableBlockInfo]()
-          blockListTemp.add(tableBlockInfo)
-          taskIdMapping.put(taskNo, blockListTemp)
-        }
-        else {
-          blockList.add(tableBlockInfo)
-        }
+      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
+        val blockInfo = new TableBlockInfo(entry.getPath.toString,
+          entry.getStart, entry.getSegmentId,
+          entry.getLocations, entry.getLength, entry.getVersion
+        )
+        !CarbonUtil
+          .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)
       })
-
-      noOfBlocks += blocksOfOneSegment.size
-      taskIdMapping.asScala.foreach(
-        entry =>
-          taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
-      )
-
     }
 
     // prepare the details required to extract the segment properties using last segment.
-    if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
-      // taking head as scala sequence is use and while adding it will add at first
-      // so as we need to update the update the key of older segments with latest keygenerator
-      // we need to take the top of the split
-      val carbonInputSplit = carbonInputSplits.head
+    if (null != splitsOfLastSegment && splitsOfLastSegment.size() > 0) {
+      val carbonInputSplit = splitsOfLastSegment.get(0)
       var dataFileFooter: DataFileFooter = null
 
       try {
         dataFileFooter = CarbonUtil.readMetadatFile(
-            CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
+          CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
       } catch {
         case e: IOException =>
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)
           throw e
       }
 
-      carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
-        .getColumnCardinality
+      columnSize = dataFileFooter.getSegmentInfo.getColumnCardinality.size
       carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
         .toList
     }
 
-    // val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava
-    // send complete list of blocks to the mapping util.
-    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+    var cardinality = new Array[Int](columnSize)
+
+    carbonInputSplits.foreach(splits => {
+      val taskNo = splits.taskId
+      var dataFileFooter: DataFileFooter = null
+
+      val splitList = taskIdMapping.get(taskNo)
+      noOfBlocks += 1
+      if (null == splitList) {
+        val splitTempList = new util.ArrayList[CarbonInputSplit]()
+        splitTempList.add(splits)
+        taskIdMapping.put(taskNo, splitTempList)
+      } else {
+        splitList.add(splits)
+      }
+
+      // Check the cardinality of each columns and set the highest.
+      try {
+        dataFileFooter = CarbonUtil.readMetadatFile(
+          CarbonInputSplit.getTableBlockInfo(splits))
+      } catch {
+        case e: IOException =>
+          logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+          throw e
+      }
+
+      // Calculate the Cardinality of the new segment
+      calculateCardanility(cardinality,
+        dataFileFooter.getSegmentInfo.getColumnCardinality,
+        columnSize)
+    }
+    )
+
+    // Set cardinality for new segment.
+    carbonMergerMapping.maxSegmentColCardinality = cardinality
+
+    taskIdMapping.asScala.foreach(
+      entry =>
+        taskInfoList
+          .add(new CarbonInputSplitTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
+    )
+
+    val nodeBlockMap = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+
+    val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
 
     val confExecutors = confExecutorsTemp.toInt
-    val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
+    val requiredExecutors = if (nodeBlockMap.size > confExecutors) {
       confExecutors
-    } else { nodeBlockMapping.size() }
+    } else { nodeBlockMap.size() }
     DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, taskInfoList.size)
     logInfo("No.of Executors required=" + requiredExecutors +
             " , spark.executor.instances=" + confExecutors +
-            ", no.of.nodes where data present=" + nodeBlockMapping.size())
+            ", no.of.nodes where data present=" + nodeBlockMap.size())
     var nodes = DistributionUtil.getNodeList(sparkContext)
     var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -359,31 +388,29 @@ class CarbonMergerRDD[K, V](
     }
     logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
     defaultParallelism = sparkContext.defaultParallelism
-    var i = 0
-
-    val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
 
     // Create Spark Partition for each task and assign blocks
-    nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
-      val taskBlockList = new java.util.ArrayList[NodeInfo](0)
-      nodeTaskBlocksMap.put(nodeName, taskBlockList)
+    nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
+      val taskSplitList = new java.util.ArrayList[NodeInfo](0)
+      nodeTaskBlocksMap.put(nodeName, taskSplitList)
       var blockletCount = 0
-      blockList.asScala.foreach { taskInfo =>
-        val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
-        blockletCount = blockletCount + blocksPerNode.getTableBlockInfoList.size()
-        taskBlockList.add(
-          NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size()))
-      }
-      if (blockletCount != 0) {
-        val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
-          carbonInputSplits.asJava, Array(nodeName))
-        result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
-        i += 1
+      splitList.asScala.foreach { splitInfo =>
+        val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
+        blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size()
+        taskSplitList.add(
+          NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size()))
+
+        if (blockletCount != 0) {
+          val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+            splitInfo.asInstanceOf[CarbonInputSplitTaskInfo].getCarbonInputSplitList,
+            Array(nodeName))
+          result.add(new CarbonSparkPartition(id, partitionNo, multiBlockSplit))
+          partitionNo += 1
+        }
       }
     }
 
     // print the node info along with task and number of blocks for the task.
-
     nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
       logInfo(s"for the node ${ entry._1 }")
       for (elem <- entry._2.asScala) {
@@ -396,17 +423,22 @@ class CarbonMergerRDD[K, V](
     logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
             s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
     logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
-    for (j <- 0 until result.size ) {
+    for (j <- 0 until result.size) {
       val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
       val splitList = multiBlockSplit.getAllSplits
-      logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
-              s"${CarbonInputSplit.createBlocks(splitList).size}")
+      logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
+              s"${ CarbonInputSplit.createBlocks(splitList).size }")
     }
     result.toArray(new Array[Partition](result.size))
   }
 
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonSparkPartition]
+    theSplit.split.value.getLocations.filter(_ != "localhost")
+  }
 }
 
+
 class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
   extends Partition {
 
@@ -415,3 +447,6 @@ class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: T
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
+
+case class SplitTaskInfo (splits: List[CarbonInputSplit]) extends CarbonInputSplit{
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index df6386b..fe92aad 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -77,7 +77,21 @@ object CarbonDataRDDFactory {
     if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
       compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
       compactionType = CompactionType.MAJOR_COMPACTION
-    } else {
+    } else if (alterTableModel.compactionType
+      .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
+      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      if (alterTableModel.segmentUpdateStatusManager.get != None) {
+        carbonLoadModel
+          .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get))
+        carbonLoadModel
+          .setSegmentUpdateDetails(alterTableModel.segmentUpdateStatusManager.get
+            .getUpdateStatusDetails.toList.asJava)
+        carbonLoadModel
+          .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
+            .getLoadMetadataDetails.toList.asJava)
+      }
+    }
+    else {
       compactionType = CompactionType.MINOR_COMPACTION
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 45d34e8..76e327c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -154,8 +154,68 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+
+  test("check if compaction with Updates") {
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql("drop table if exists  cardinalityUpdatetest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+    sql(
+      "CREATE TABLE IF NOT EXISTS cardinalityUpdateTest (FirstName String, LastName String, date Timestamp," +
+      "phonetype String, serialname String, ID int, salary Int) STORED BY 'org.apache.carbondata" +
+      ".format'"
+    )
+
+    val csvFilePath1 = s"$resourcesPath/compaction/compactionIUD1.csv"
+    val csvFilePath2 = s"$resourcesPath/compaction/compactionIUD2.csv"
+    val csvFilePath3 = s"$resourcesPath/compaction/compactionIUD3.csv"
+    val csvFilePath4 = s"$resourcesPath/compaction/compactionIUD4.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath3 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath4 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+
+    // update the first segment
+    sql("update cardinalityUpdateTest set (FirstName) = ('FirstTwentyOne') where ID = 2").show()
+
+    // alter table.
+    sql("alter table cardinalityUpdateTest compact 'major'").show()
+
+    // Verify the new updated value in compacted segment.
+    // now check the answers it should be same.
+    checkAnswer(
+      sql("select FirstName from cardinalityUpdateTest where FirstName = ('FirstTwentyOne')"),
+      Seq(Row("FirstTwentyOne")
+      )
+    )
+
+    checkAnswer(
+      sql("select count(*) from cardinalityUpdateTest where FirstName = ('FirstTwentyOne')"),
+      Seq(Row(1)
+      )
+    )
+
+    checkAnswer(
+      sql("select count(*) from cardinalityUpdateTest"),
+      Seq(Row(20)
+      )
+    )
+  }
+
   override def afterAll {
-    sql("drop table if exists  normalcompaction")
+    sql("drop table if exists normalcompaction")
+    sql("drop table if exists cardinalityUpdatetest")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
index 480e865..fd8c6cf 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
@@ -127,7 +127,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     )
     sql("""drop table dest2""").show()
   }
-/*
+
 
   test("test IUD Horizontal Compaction Delete") {
     sql("""drop database if exists iud4 cascade""").show()
@@ -136,15 +136,15 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
     sql("""select * from dest2""").show()
     sql(
       """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
     sql("""select * from source2""").show()
     sql("""delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
     sql("""select * from dest2 order by 2""").show()
@@ -185,14 +185,14 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
     sql(
       """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
     sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
     sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c11,s.c66 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
     sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 3 and s.c22 < 5) or (s.c22 > 13 and s.c22 < 15) or (s.c22 > 23 and s.c22 < 25) or (s.c22 > 33 and s.c22 < 35))""").show()
@@ -254,14 +254,14 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
     sql(
       """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
     sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
     sql("""delete from dest2 where (c2 < 2) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
     sql("""delete from dest2 where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""").show()
@@ -301,7 +301,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table T_Carbn01(Active_status String,Item_type_cd INT,Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Discount_price DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Update_time TIMESTAMP,Create_date String)STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/T_Hive1.csv' INTO table t_carbn01 options ('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE','DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='Active_status,Item_type_cd,Qty_day_avg,Qty_total,Sell_price,Sell_pricep,Discount_price,Profit,Item_code,Item_name,Outlet_name,Update_time,Create_date')""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/T_Hive1.csv' INTO table t_carbn01 options ('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE','DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='Active_status,Item_type_cd,Qty_day_avg,Qty_total,Sell_price,Sell_pricep,Discount_price,Profit,Item_code,Item_name,Outlet_name,Update_time,Create_date')""").show()
     sql("""update t_carbn01 set (item_code) = ('Orange') where item_type_cd = 14""").show()
     sql("""update t_carbn01 set (item_code) = ('Banana') where item_type_cd = 2""").show()
     sql("""delete from t_carbn01 where item_code in ('RE3423ee','Orange','Banana')""").show()
@@ -325,15 +325,15 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
     sql(
       """delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""")
 
       .show()
-    sql("""delete from table dest2 where segment.id IN(0)""").show()
+    sql("""DELETE SEGMENT 0 FROM TABLE dest2""").show()
     sql("""clean files for table dest2""").show()
     sql(
       """update dest2 set (c5) = ('8RAM size') where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""")
@@ -352,10 +352,10 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
       .show()
-    sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
-    sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
     sql("""delete from dest2 where c2 < 41""").show()
     sql("""alter table dest2 compact 'major'""").show()
     checkAnswer(
@@ -364,7 +364,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     )
     sql("""drop table dest2""").show()
   }
-*/
+
 
   override def afterAll {
     sql("use default")



[2/2] incubator-carbondata git commit: [CARBONDATA-691] After Compaction records count are mismatched. This closes #604

Posted by ra...@apache.org.
[CARBONDATA-691] After Compaction records count are mismatched. This closes #604


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8e8bc5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8e8bc5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8e8bc5de

Branch: refs/heads/master
Commit: 8e8bc5de2aee3497c60667d128910255a5c5d1f9
Parents: 50ec532 49ad2bb
Author: ravipesala <ra...@gmail.com>
Authored: Tue Mar 7 12:13:53 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Mar 7 12:13:53 2017 +0530

----------------------------------------------------------------------
 .../hadoop/util/CarbonInputSplitTaskInfo.java   | 129 ++++++++++++
 .../resources/compaction/compactionIUD1.csv     |   6 +
 .../resources/compaction/compactionIUD2.csv     |   6 +
 .../resources/compaction/compactionIUD3.csv     |   6 +
 .../resources/compaction/compactionIUD4.csv     |   6 +
 .../DataCompactionBlockletBoundryTest.scala     |   2 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   1 +
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 197 +++++++++++--------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  16 +-
 .../datacompaction/DataCompactionTest.scala     |  62 +++++-
 .../testsuite/iud/IUDCompactionTestCases.scala  |  54 ++---
 12 files changed, 376 insertions(+), 113 deletions(-)
----------------------------------------------------------------------