You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/25 12:28:45 UTC
[carbondata] branch master updated: [CARBONDATA-3328]Fixed
performance issue with merge small files distribution
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 91c6758 [CARBONDATA-3328]Fixed performance issue with merge small files distribution
91c6758 is described below
commit 91c6758c2468200b70c89604362d79f2ea2a00d5
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Mon Mar 25 12:36:34 2019 +0530
[CARBONDATA-3328]Fixed performance issue with merge small files distribution
Problem
After PR#3154 in case of merge small files split length was coming 0 because of this it was merging all the files and impacting query performance when merge small files distribution is true
Solution
Now in CarbonInputSplit getLength method if it is -1 it get from datamaprow and if data map row is null then it will get from detailinfo
This closes #3161
---
.../carbondata/core/indexstore/ExtendedBlocklet.java | 2 +-
.../org/apache/carbondata/hadoop/CarbonInputSplit.java | 17 +++++++++++++----
.../apache/carbondata/hadoop/CarbonMultiBlockSplit.java | 8 ++------
.../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
4 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 8c4ea06..3d6cedd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -38,7 +38,7 @@ public class ExtendedBlocklet extends Blocklet {
boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) {
super(filePath, blockletId, compareBlockletIdForObjectMatching);
try {
- this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 0, version, null);
+ this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, -1, version, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index bb1742c..406456f 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -96,7 +96,7 @@ public class CarbonInputSplit extends FileSplit
private transient List<ColumnSchema> columnSchema;
- private transient boolean useMinMaxForPruning;
+ private boolean useMinMaxForPruning = true;
private boolean isBlockCache = true;
@@ -534,7 +534,7 @@ public class CarbonInputSplit extends FileSplit
out.writeInt(blockletInfoBinary.length);
out.write(blockletInfoBinary);
}
- out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+ out.writeLong(getLength());
out.writeBoolean(this.isLegacyStore);
out.writeBoolean(this.useMinMaxForPruning);
}
@@ -553,7 +553,7 @@ public class CarbonInputSplit extends FileSplit
detailInfo.setBlockFooterOffset(
this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
detailInfo
- .setBlockSize(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+ .setBlockSize(getLength());
detailInfo.setLegacyStore(isLegacyStore);
detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
if (!this.isBlockCache) {
@@ -602,7 +602,16 @@ public class CarbonInputSplit extends FileSplit
public long getStart() { return start; }
@Override
- public long getLength() { return length; }
+ public long getLength() {
+ if (length == -1) {
+ if (null != dataMapRow) {
+ length = this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH);
+ } else if (null != detailInfo) {
+ length = detailInfo.getBlockSize();
+ }
+ }
+ return length;
+ }
@Override
public String toString() { return filePath + ":" + start + "+" + length; }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 4c99c4f..7ac5bc0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -100,18 +100,14 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
public void calculateLength() {
long total = 0;
- if (splitList.size() > 1 && splitList.get(0).getDetailInfo() != null) {
+ if (splitList.size() > 0) {
Map<String, Long> blockSizes = new HashMap<>();
for (CarbonInputSplit split : splitList) {
- blockSizes.put(split.getBlockPath(), split.getDetailInfo().getBlockSize());
+ blockSizes.put(split.getFilePath(), split.getLength());
}
for (Map.Entry<String, Long> entry : blockSizes.entrySet()) {
total += entry.getValue();
}
- } else {
- for (CarbonInputSplit split : splitList) {
- total += split.getLength();
- }
}
length = total;
}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 9e66139..6cee8dc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -304,7 +304,7 @@ class CarbonScanRDD[T: ClassTag](
val blockSplits = splits
.asScala
.map(_.asInstanceOf[CarbonInputSplit])
- .groupBy(f => f.getBlockPath)
+ .groupBy(f => f.getFilePath)
.map { blockSplitEntry =>
new CarbonMultiBlockSplit(
blockSplitEntry._2.asJava,