You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/03/07 06:44:34 UTC
[1/2] incubator-carbondata git commit: Compaction Partitioning changes
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 50ec532a2 -> 8e8bc5de2
Compaction Partitioning changes
Compaction Cardinality Changes
Test Case Update
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/49ad2bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/49ad2bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/49ad2bb0
Branch: refs/heads/master
Commit: 49ad2bb0d8400b2af5a76a2518f0da8e6bf2047a
Parents: 50ec532
Author: sounakr <so...@gmail.com>
Authored: Fri Feb 17 20:12:39 2017 +0530
Committer: sounakr <so...@gmail.com>
Committed: Mon Mar 6 13:02:05 2017 +0530
----------------------------------------------------------------------
.../hadoop/util/CarbonInputSplitTaskInfo.java | 129 ++++++++++++
.../resources/compaction/compactionIUD1.csv | 6 +
.../resources/compaction/compactionIUD2.csv | 6 +
.../resources/compaction/compactionIUD3.csv | 6 +
.../resources/compaction/compactionIUD4.csv | 6 +
.../DataCompactionBlockletBoundryTest.scala | 2 +-
.../DataCompactionCardinalityBoundryTest.scala | 1 +
.../spark/rdd/CarbonIUDMergerRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 197 +++++++++++--------
.../spark/rdd/CarbonDataRDDFactory.scala | 16 +-
.../datacompaction/DataCompactionTest.scala | 62 +++++-
.../testsuite/iud/IUDCompactionTestCases.scala | 54 ++---
12 files changed, 376 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
new file mode 100644
index 0000000..3a53765
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+public class CarbonInputSplitTaskInfo implements Distributable {
+
+ private final List<CarbonInputSplit> carbonBlockInfoList;
+
+ private final String taskId;
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public List<CarbonInputSplit> getCarbonInputSplitList() {
+ return carbonBlockInfoList;
+ }
+
+ public CarbonInputSplitTaskInfo(String taskId, List<CarbonInputSplit> carbonSplitListInfo) {
+ this.taskId = taskId;
+ this.carbonBlockInfoList = carbonSplitListInfo;
+ }
+
+ @Override public String[] getLocations() {
+ Set<String> locations = new HashSet<String>();
+ for (CarbonInputSplit splitInfo : carbonBlockInfoList) {
+ try {
+ locations.addAll(Arrays.asList(splitInfo.getLocations()));
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to get location of split: " + splitInfo, e);
+ }
+ }
+ locations.toArray(new String[locations.size()]);
+ List<String> nodes = CarbonInputSplitTaskInfo.maxNoNodes(carbonBlockInfoList);
+ return nodes.toArray(new String[nodes.size()]);
+ }
+
+ @Override public int compareTo(Distributable o) {
+ return taskId.compareTo(((CarbonInputSplitTaskInfo) o).getTaskId());
+ }
+
+ /**
+ * Finding which node has the maximum number of blocks for it.
+ *
+ * @param blockList
+ * @return
+ */
+ public static List<String> maxNoNodes(List<CarbonInputSplit> splitList) {
+ boolean useIndex = true;
+ Integer maxOccurence = 0;
+ String maxNode = null;
+ Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
+
+ // populate the map of node and number of occurences of that node.
+ for (CarbonInputSplit split : splitList) {
+ try {
+ for (String node : split.getLocations()) {
+ Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
+ if (null == nodeOccurence) {
+ nodeAndOccurenceMapping.put(node, 1);
+ } else {
+ nodeOccurence++;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to get location of split: " + split, e);
+ }
+ }
+ Integer previousValueOccurence = null;
+
+ // check which node is occured maximum times.
+ for (Map.Entry<String, Integer> entry : nodeAndOccurenceMapping.entrySet()) {
+ // finding the maximum node.
+ if (entry.getValue() > maxOccurence) {
+ maxOccurence = entry.getValue();
+ maxNode = entry.getKey();
+ }
+ // first time scenario. initialzing the previous value.
+ if (null == previousValueOccurence) {
+ previousValueOccurence = entry.getValue();
+ } else {
+ // for the case where all the nodes have same number of blocks then
+ // we need to return complete list instead of max node.
+ if (!Objects.equals(previousValueOccurence, entry.getValue())) {
+ useIndex = false;
+ }
+ }
+ }
+
+ // if all the nodes have equal occurence then returning the complete key set.
+ if (useIndex) {
+ return new ArrayList<>(nodeAndOccurenceMapping.keySet());
+ }
+
+ // if any max node is found then returning the max node.
+ List<String> node = new ArrayList<>(1);
+ node.add(maxNode);
+ return node;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
new file mode 100644
index 0000000..f9c9e29
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD1.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date,phonetype,serialname,ID,salary
+FirstOne,LastOne,07/24/2015, phone197,ASD69643,1,15000
+FirstSecond,LastSecond,07/24/2015,phone756,ASD42892,2,15001
+FirstThird,LastThird,07/25/2015,phone1904,ASD37014,3,15002
+FirstFour,LastFour,07/26/2015,phone2435,ASD66902,4,15003
+FirstFive,LastFive,07/27/2015,phone2441,ASD90633,5,15004
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
new file mode 100644
index 0000000..652f182
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD2.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstSix,LastSix,07/24/2015, phone197,ASD69643,6, 15000
+FirstSeven, LastSeven, 07/24/2015,phone756,ASD42892,7,15001
+FirstEight, LastEight, 07/25/2015,phone1904,ASD37014,8,15002
+FirstNine, LastNine, 07/26/2015,phone2435,ASD66902,9,15003
+FirstTen, LastTen, 07/27/2015,phone2441,ASD90633,10,15004
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
new file mode 100644
index 0000000..9986a9c
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD3.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstEleven,LastEleven,07/24/2015, phone197,ASD69643,11, 15000
+FirstTwelve, LastTwelve, 07/24/2015,phone756,ASD42892,12,15001
+FirstThirteen, LastThirteen, 07/25/2015,phone1904,ASD37014,13,15002
+FirstFourteen, LastFourteen, 07/26/2015,phone2435,ASD66902,14,15003
+FirstFifteen, LastFifteen, 07/27/2015,phone2441,ASD90633,15,15004
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv b/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
new file mode 100644
index 0000000..baf8596
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/compactionIUD4.csv
@@ -0,0 +1,6 @@
+FirstName,LastName,date, phonetype,serialname,ID,salary
+FirstSixteen,LastSixteen,07/24/2015, phone197,ASD69643,16, 15000
+FirstSeventeen, LastSeventeen, 07/24/2015,phone756,ASD42892,17,15001
+FirstEighteen, LastEighteen, 07/25/2015,phone1904,ASD37014,18,15002
+FirstNineteen, LastNineteen, 07/26/2015,phone2435,ASD66902,19,15003
+FirstTwenty, LastTwenty, 07/27/2015,phone2441,ASD90633,20,15004
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
index 05895c4..a5078e7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
@@ -34,7 +34,7 @@ class DataCompactionBlockletBoundryTest extends QueryTest with BeforeAndAfterAll
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
- "120")
+ "125")
sql(
"CREATE TABLE IF NOT EXISTS blocklettest (country String, ID String, date Timestamp, name " +
"String, " +
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index 2a5ae92..311c53f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -112,6 +112,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
)
}
+
override def afterAll {
sql("drop table if exists cardinalityTest")
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index d4918ce..f04669c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -81,14 +81,13 @@ class CarbonIUDMergerRDD[K, V](
// group blocks by segment.
val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
- var i = 0
+ var i = -1
// No need to get a new SegmentUpdateStatus Manager as the Object is passed
// in CarbonLoadModel.
// val manager = new SegmentUpdateStatusManager(absoluteTableIdentifier)
val updateStatusManager = carbonLoadModel.getSegmentUpdateStatusManager
-
// make one spark partition for one segment
val resultSplits = splitsGroupedMySegment.map(entry => {
val (segName, splits) = (entry._1, entry._2)
@@ -100,6 +99,7 @@ class CarbonIUDMergerRDD[K, V](
if (!validSplits.isEmpty) {
val locations = validSplits(0).getLocations
+ i += 1
new CarbonSparkPartition(id, i,
new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 73f3bcd..7a506ba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.spark.rdd
import java.io.IOException
+import java.util
import java.util.{Collections, List}
import scala.collection.JavaConverters._
@@ -37,18 +38,19 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block._
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, CompactionType, RowResultMerger}
+import org.apache.carbondata.spark.merger._
import org.apache.carbondata.spark.splits.TableSplit
class CarbonMergerRDD[K, V](
@@ -235,9 +237,19 @@ class CarbonMergerRDD[K, V](
iter
}
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonSparkPartition]
- theSplit.split.value.getLocations.filter(_ != "localhost")
+
+ def calculateCardanility(targetCardinality: Array[Int],
+ sourceCardinality: Array[Int],
+ columnSize: Int): Unit = {
+ var cols = columnSize
+
+ // Choose the highest cardinality among all the blocks.
+ while (cols > 0) {
+ if (targetCardinality(cols - 1) < sourceCardinality(cols - 1)) {
+ targetCardinality(cols - 1) = sourceCardinality(cols - 1)
+ }
+ cols -= 1
+ }
}
override def getPartitions: Array[Partition] = {
@@ -245,111 +257,128 @@ class CarbonMergerRDD[K, V](
val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
- val updateStatusManger: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
+ val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
absoluteTableIdentifier)
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
var defaultParallelism = sparkContext.defaultParallelism
val result = new java.util.ArrayList[Partition](defaultParallelism)
+ var partitionNo = 0
+ var columnSize = 0
+ var noOfBlocks = 0
// mapping of the node and block list.
var nodeBlockMapping: java.util.Map[String, java.util.List[Distributable]] = new
- java.util.HashMap[String, java.util.List[Distributable]]
+ java.util.HashMap[String, java.util.List[Distributable]]
- var noOfBlocks = 0
val taskInfoList = new java.util.ArrayList[Distributable]
var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
- var blocksOfLastSegment: List[TableBlockInfo] = null
+ var splitsOfLastSegment: List[CarbonInputSplit] = null
+ // map for keeping the relation of a task and its blocks.
+ val taskIdMapping: java.util.Map[String, java.util.List[CarbonInputSplit]] = new
+ java.util.HashMap[String, java.util.List[CarbonInputSplit]]
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
- // map for keeping the relation of a task and its blocks.
- val taskIdMapping: java.util.Map[String, java.util.List[TableBlockInfo]] = new
- java.util.HashMap[String, java.util.List[TableBlockInfo]]
// map for keeping the relation of a task and its blocks.
job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+ val updateDetails: UpdateVO = updateStatusManager.getInvalidTimestampRange(eachSeg)
+
// get splits
val splits = format.getSplits(job)
- carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
- val updateDetails: UpdateVO = updateStatusManger.getInvalidTimestampRange(eachSeg)
-
- // take the blocks of one segment.
- val blocksOfOneSegment = carbonInputSplits.map(inputSplit =>
- new TableBlockInfo(inputSplit.getPath.toString,
- inputSplit.getStart, inputSplit.getSegmentId,
- inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion
- )
- )
- .filter(blockInfo => !CarbonUtil
- .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManger))
// keep on assigning till last one is reached.
- if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
- blocksOfLastSegment = blocksOfOneSegment.asJava
+ if (null != splits && splits.size > 0) {
+ splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
}
- // populate the task and its block mapping.
- blocksOfOneSegment.foreach(f = tableBlockInfo => {
- val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
- val blockList = taskIdMapping.get(taskNo)
- if (null == blockList) {
- val blockListTemp = new java.util.ArrayList[TableBlockInfo]()
- blockListTemp.add(tableBlockInfo)
- taskIdMapping.put(taskNo, blockListTemp)
- }
- else {
- blockList.add(tableBlockInfo)
- }
+ carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
+ val blockInfo = new TableBlockInfo(entry.getPath.toString,
+ entry.getStart, entry.getSegmentId,
+ entry.getLocations, entry.getLength, entry.getVersion
+ )
+ !CarbonUtil
+ .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)
})
-
- noOfBlocks += blocksOfOneSegment.size
- taskIdMapping.asScala.foreach(
- entry =>
- taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
- )
-
}
// prepare the details required to extract the segment properties using last segment.
- if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
- // taking head as scala sequence is use and while adding it will add at first
- // so as we need to update the update the key of older segments with latest keygenerator
- // we need to take the top of the split
- val carbonInputSplit = carbonInputSplits.head
+ if (null != splitsOfLastSegment && splitsOfLastSegment.size() > 0) {
+ val carbonInputSplit = splitsOfLastSegment.get(0)
var dataFileFooter: DataFileFooter = null
try {
dataFileFooter = CarbonUtil.readMetadatFile(
- CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
+ CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
- carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
- .getColumnCardinality
+ columnSize = dataFileFooter.getSegmentInfo.getColumnCardinality.size
carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
.toList
}
- // val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava
- // send complete list of blocks to the mapping util.
- nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+ var cardinality = new Array[Int](columnSize)
+
+ carbonInputSplits.foreach(splits => {
+ val taskNo = splits.taskId
+ var dataFileFooter: DataFileFooter = null
+
+ val splitList = taskIdMapping.get(taskNo)
+ noOfBlocks += 1
+ if (null == splitList) {
+ val splitTempList = new util.ArrayList[CarbonInputSplit]()
+ splitTempList.add(splits)
+ taskIdMapping.put(taskNo, splitTempList)
+ } else {
+ splitList.add(splits)
+ }
+
+ // Check the cardinality of each columns and set the highest.
+ try {
+ dataFileFooter = CarbonUtil.readMetadatFile(
+ CarbonInputSplit.getTableBlockInfo(splits))
+ } catch {
+ case e: IOException =>
+ logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+ throw e
+ }
+
+ // Calculate the Cardinality of the new segment
+ calculateCardanility(cardinality,
+ dataFileFooter.getSegmentInfo.getColumnCardinality,
+ columnSize)
+ }
+ )
+
+ // Set cardinality for new segment.
+ carbonMergerMapping.maxSegmentColCardinality = cardinality
+
+ taskIdMapping.asScala.foreach(
+ entry =>
+ taskInfoList
+ .add(new CarbonInputSplitTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
+ )
+
+ val nodeBlockMap = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+
+ val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
val confExecutors = confExecutorsTemp.toInt
- val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
+ val requiredExecutors = if (nodeBlockMap.size > confExecutors) {
confExecutors
- } else { nodeBlockMapping.size() }
+ } else { nodeBlockMap.size() }
DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, taskInfoList.size)
logInfo("No.of Executors required=" + requiredExecutors +
" , spark.executor.instances=" + confExecutors +
- ", no.of.nodes where data present=" + nodeBlockMapping.size())
+ ", no.of.nodes where data present=" + nodeBlockMap.size())
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30
while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -359,31 +388,29 @@ class CarbonMergerRDD[K, V](
}
logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
defaultParallelism = sparkContext.defaultParallelism
- var i = 0
-
- val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
// Create Spark Partition for each task and assign blocks
- nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
- val taskBlockList = new java.util.ArrayList[NodeInfo](0)
- nodeTaskBlocksMap.put(nodeName, taskBlockList)
+ nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
+ val taskSplitList = new java.util.ArrayList[NodeInfo](0)
+ nodeTaskBlocksMap.put(nodeName, taskSplitList)
var blockletCount = 0
- blockList.asScala.foreach { taskInfo =>
- val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
- blockletCount = blockletCount + blocksPerNode.getTableBlockInfoList.size()
- taskBlockList.add(
- NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size()))
- }
- if (blockletCount != 0) {
- val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
- carbonInputSplits.asJava, Array(nodeName))
- result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
- i += 1
+ splitList.asScala.foreach { splitInfo =>
+ val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
+ blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size()
+ taskSplitList.add(
+ NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size()))
+
+ if (blockletCount != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+ splitInfo.asInstanceOf[CarbonInputSplitTaskInfo].getCarbonInputSplitList,
+ Array(nodeName))
+ result.add(new CarbonSparkPartition(id, partitionNo, multiBlockSplit))
+ partitionNo += 1
+ }
}
}
// print the node info along with task and number of blocks for the task.
-
nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
logInfo(s"for the node ${ entry._1 }")
for (elem <- entry._2.asScala) {
@@ -396,17 +423,22 @@ class CarbonMergerRDD[K, V](
logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
- for (j <- 0 until result.size ) {
+ for (j <- 0 until result.size) {
val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
val splitList = multiBlockSplit.getAllSplits
- logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
- s"${CarbonInputSplit.createBlocks(splitList).size}")
+ logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
+ s"${ CarbonInputSplit.createBlocks(splitList).size }")
}
result.toArray(new Array[Partition](result.size))
}
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonSparkPartition]
+ theSplit.split.value.getLocations.filter(_ != "localhost")
+ }
}
+
class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
extends Partition {
@@ -415,3 +447,6 @@ class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: T
override def hashCode(): Int = 41 * (41 + rddId) + idx
}
+
+case class SplitTaskInfo (splits: List[CarbonInputSplit]) extends CarbonInputSplit{
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index df6386b..fe92aad 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -77,7 +77,21 @@ object CarbonDataRDDFactory {
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
compactionType = CompactionType.MAJOR_COMPACTION
- } else {
+ } else if (alterTableModel.compactionType
+ .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
+ compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+ if (alterTableModel.segmentUpdateStatusManager.get != None) {
+ carbonLoadModel
+ .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get))
+ carbonLoadModel
+ .setSegmentUpdateDetails(alterTableModel.segmentUpdateStatusManager.get
+ .getUpdateStatusDetails.toList.asJava)
+ carbonLoadModel
+ .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
+ .getLoadMetadataDetails.toList.asJava)
+ }
+ }
+ else {
compactionType = CompactionType.MINOR_COMPACTION
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 45d34e8..76e327c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -154,8 +154,68 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
)
}
+
+ test("check if compaction with Updates") {
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+ sql("drop table if exists cardinalityUpdatetest")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+ sql(
+ "CREATE TABLE IF NOT EXISTS cardinalityUpdateTest (FirstName String, LastName String, date Timestamp," +
+ "phonetype String, serialname String, ID int, salary Int) STORED BY 'org.apache.carbondata" +
+ ".format'"
+ )
+
+ val csvFilePath1 = s"$resourcesPath/compaction/compactionIUD1.csv"
+ val csvFilePath2 = s"$resourcesPath/compaction/compactionIUD2.csv"
+ val csvFilePath3 = s"$resourcesPath/compaction/compactionIUD3.csv"
+ val csvFilePath4 = s"$resourcesPath/compaction/compactionIUD4.csv"
+
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath3 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath4 + "' INTO TABLE cardinalityUpdateTest OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+
+ // update the first segment
+ sql("update cardinalityUpdateTest set (FirstName) = ('FirstTwentyOne') where ID = 2").show()
+
+ // alter table.
+ sql("alter table cardinalityUpdateTest compact 'major'").show()
+
+ // Verify the new updated value in compacted segment.
+ // now check the answers it should be same.
+ checkAnswer(
+ sql("select FirstName from cardinalityUpdateTest where FirstName = ('FirstTwentyOne')"),
+ Seq(Row("FirstTwentyOne")
+ )
+ )
+
+ checkAnswer(
+ sql("select count(*) from cardinalityUpdateTest where FirstName = ('FirstTwentyOne')"),
+ Seq(Row(1)
+ )
+ )
+
+ checkAnswer(
+ sql("select count(*) from cardinalityUpdateTest"),
+ Seq(Row(20)
+ )
+ )
+ }
+
override def afterAll {
- sql("drop table if exists normalcompaction")
+ sql("drop table if exists normalcompaction")
+ sql("drop table if exists cardinalityUpdatetest")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/49ad2bb0/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
index 480e865..fd8c6cf 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
@@ -127,7 +127,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
)
sql("""drop table dest2""").show()
}
-/*
+
test("test IUD Horizontal Compaction Delete") {
sql("""drop database if exists iud4 cascade""").show()
@@ -136,15 +136,15 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
sql("""select * from dest2""").show()
sql(
"""create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
sql("""select * from source2""").show()
sql("""delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
sql("""select * from dest2 order by 2""").show()
@@ -185,14 +185,14 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
sql(
"""create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c11,s.c66 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 3 and s.c22 < 5) or (s.c22 > 13 and s.c22 < 15) or (s.c22 > 23 and s.c22 < 25) or (s.c22 > 33 and s.c22 < 35))""").show()
@@ -254,14 +254,14 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
sql(
"""create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/source3.csv' INTO table source2""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""").show()
sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
sql("""delete from dest2 where (c2 < 2) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
sql("""delete from dest2 where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""").show()
@@ -301,7 +301,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table T_Carbn01(Active_status String,Item_type_cd INT,Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Discount_price DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Update_time TIMESTAMP,Create_date String)STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/T_Hive1.csv' INTO table t_carbn01 options ('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE','DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='Active_status,Item_type_cd,Qty_day_avg,Qty_total,Sell_price,Sell_pricep,Discount_price,Profit,Item_code,Item_name,Outlet_name,Update_time,Create_date')""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/T_Hive1.csv' INTO table t_carbn01 options ('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE','DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='Active_status,Item_type_cd,Qty_day_avg,Qty_total,Sell_price,Sell_pricep,Discount_price,Profit,Item_code,Item_name,Outlet_name,Update_time,Create_date')""").show()
sql("""update t_carbn01 set (item_code) = ('Orange') where item_type_cd = 14""").show()
sql("""update t_carbn01 set (item_code) = ('Banana') where item_type_cd = 2""").show()
sql("""delete from t_carbn01 where item_code in ('RE3423ee','Orange','Banana')""").show()
@@ -325,15 +325,15 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
sql(
"""delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""")
.show()
- sql("""delete from table dest2 where segment.id IN(0)""").show()
+ sql("""DELETE SEGMENT 0 FROM TABLE dest2""").show()
sql("""clean files for table dest2""").show()
sql(
"""update dest2 set (c5) = ('8RAM size') where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""")
@@ -352,10 +352,10 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
.show()
- sql("""load data local inpath './src/test/resources/IUD/comp1.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp2.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp3.csv' INTO table dest2""").show()
- sql("""load data local inpath './src/test/resources/IUD/comp4.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""").show()
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""").show()
sql("""delete from dest2 where c2 < 41""").show()
sql("""alter table dest2 compact 'major'""").show()
checkAnswer(
@@ -364,7 +364,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
)
sql("""drop table dest2""").show()
}
-*/
+
override def afterAll {
sql("use default")
[2/2] incubator-carbondata git commit: [CARBONDATA-691] After
Compaction records count are mismatched. This closes #604
Posted by ra...@apache.org.
[CARBONDATA-691] After Compaction records count are mismatched. This closes #604
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8e8bc5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8e8bc5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8e8bc5de
Branch: refs/heads/master
Commit: 8e8bc5de2aee3497c60667d128910255a5c5d1f9
Parents: 50ec532 49ad2bb
Author: ravipesala <ra...@gmail.com>
Authored: Tue Mar 7 12:13:53 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Mar 7 12:13:53 2017 +0530
----------------------------------------------------------------------
.../hadoop/util/CarbonInputSplitTaskInfo.java | 129 ++++++++++++
.../resources/compaction/compactionIUD1.csv | 6 +
.../resources/compaction/compactionIUD2.csv | 6 +
.../resources/compaction/compactionIUD3.csv | 6 +
.../resources/compaction/compactionIUD4.csv | 6 +
.../DataCompactionBlockletBoundryTest.scala | 2 +-
.../DataCompactionCardinalityBoundryTest.scala | 1 +
.../spark/rdd/CarbonIUDMergerRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 197 +++++++++++--------
.../spark/rdd/CarbonDataRDDFactory.scala | 16 +-
.../datacompaction/DataCompactionTest.scala | 62 +++++-
.../testsuite/iud/IUDCompactionTestCases.scala | 54 ++---
12 files changed, 376 insertions(+), 113 deletions(-)
----------------------------------------------------------------------