You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/25 12:28:45 UTC

[carbondata] branch master updated: [CARBONDATA-3328]Fixed performance issue with merge small files distribution

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 91c6758  [CARBONDATA-3328]Fixed performance issue with merge small files distribution
91c6758 is described below

commit 91c6758c2468200b70c89604362d79f2ea2a00d5
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Mon Mar 25 12:36:34 2019 +0530

    [CARBONDATA-3328]Fixed performance issue with merge small files distribution
    
    Problem
    After PR#3154 in case of merge small files split length was coming 0 because of this it was merging all the files and impacting query performance when merge small files distribution is true
    
    Solution
    Now in CarbonInputSplit getLength method if it is -1 it get from datamaprow and if data map row is null then it will get from detailinfo
    
    This closes #3161
---
 .../carbondata/core/indexstore/ExtendedBlocklet.java    |  2 +-
 .../org/apache/carbondata/hadoop/CarbonInputSplit.java  | 17 +++++++++++++----
 .../apache/carbondata/hadoop/CarbonMultiBlockSplit.java |  8 ++------
 .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala |  2 +-
 4 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 8c4ea06..3d6cedd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -38,7 +38,7 @@ public class ExtendedBlocklet extends Blocklet {
       boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) {
     super(filePath, blockletId, compareBlockletIdForObjectMatching);
     try {
-      this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 0, version, null);
+      this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, -1, version, null);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index bb1742c..406456f 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -96,7 +96,7 @@ public class CarbonInputSplit extends FileSplit
 
   private transient List<ColumnSchema> columnSchema;
 
-  private transient boolean useMinMaxForPruning;
+  private boolean useMinMaxForPruning = true;
 
   private boolean isBlockCache = true;
 
@@ -534,7 +534,7 @@ public class CarbonInputSplit extends FileSplit
       out.writeInt(blockletInfoBinary.length);
       out.write(blockletInfoBinary);
     }
-    out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+    out.writeLong(getLength());
     out.writeBoolean(this.isLegacyStore);
     out.writeBoolean(this.useMinMaxForPruning);
   }
@@ -553,7 +553,7 @@ public class CarbonInputSplit extends FileSplit
       detailInfo.setBlockFooterOffset(
           this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
       detailInfo
-          .setBlockSize(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+          .setBlockSize(getLength());
       detailInfo.setLegacyStore(isLegacyStore);
       detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
       if (!this.isBlockCache) {
@@ -602,7 +602,16 @@ public class CarbonInputSplit extends FileSplit
   public long getStart() { return start; }
 
   @Override
-  public long getLength() { return length; }
+  public long getLength() {
+    if (length == -1) {
+      if (null != dataMapRow) {
+        length = this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH);
+      } else if (null != detailInfo) {
+        length = detailInfo.getBlockSize();
+      }
+    }
+    return length;
+  }
 
   @Override
   public String toString() { return filePath + ":" + start + "+" + length; }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 4c99c4f..7ac5bc0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -100,18 +100,14 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
 
   public void calculateLength() {
     long total = 0;
-    if (splitList.size() > 1 && splitList.get(0).getDetailInfo() != null) {
+    if (splitList.size() > 0) {
       Map<String, Long> blockSizes = new HashMap<>();
       for (CarbonInputSplit split : splitList) {
-        blockSizes.put(split.getBlockPath(), split.getDetailInfo().getBlockSize());
+        blockSizes.put(split.getFilePath(), split.getLength());
       }
       for (Map.Entry<String, Long> entry : blockSizes.entrySet()) {
         total += entry.getValue();
       }
-    } else {
-      for (CarbonInputSplit split : splitList) {
-        total += split.getLength();
-      }
     }
     length = total;
   }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 9e66139..6cee8dc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -304,7 +304,7 @@ class CarbonScanRDD[T: ClassTag](
           val blockSplits = splits
             .asScala
             .map(_.asInstanceOf[CarbonInputSplit])
-            .groupBy(f => f.getBlockPath)
+            .groupBy(f => f.getFilePath)
             .map { blockSplitEntry =>
               new CarbonMultiBlockSplit(
                 blockSplitEntry._2.asJava,