You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/09 18:26:28 UTC

[44/47] carbondata git commit: [HOTFIX] Modified code to fix the degrade in compaction performance

[HOTFIX] Modified code to fix the degrade in compaction performance

Problem
Compaction performance for 3.5 billion degraded by 16-20%

Analysis:
Code modification in RawResultIerator.java has caused the problem wherein few extra checks are performed as compared to previous code

Fix
Revert the changes in RawResultIerator class

This closes #2613


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7535d46a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7535d46a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7535d46a

Branch: refs/heads/branch-1.4
Commit: 7535d46ac2ca26072189c1d7cb3e6189572c4ce7
Parents: 7ece100
Author: m00258959 <ma...@huawei.com>
Authored: Tue Aug 7 13:24:16 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 9 23:51:36 2018 +0530

----------------------------------------------------------------------
 .../scan/result/iterator/RawResultIterator.java | 205 ++++++++-----------
 .../detailquery/SearchModeTestCase.scala        |   2 +
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   2 +-
 .../merger/CarbonCompactionExecutor.java        |   2 +-
 4 files changed, 85 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 94cea91..efa5b8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -16,22 +16,13 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * This is a wrapper iterator over the detail raw query iterator.
@@ -39,11 +30,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
  * This will handle the batch results and will iterate on the batches and give single row.
  */
 public class RawResultIterator extends CarbonIterator<Object[]> {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RawResultIterator.class.getName());
 
   private final SegmentProperties sourceSegProperties;
 
@@ -53,130 +39,85 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
    */
   private CarbonIterator<RowBatch> detailRawQueryResultIterator;
 
-  private boolean prefetchEnabled;
-  private List<Object[]> currentBuffer;
-  private List<Object[]> backupBuffer;
-  private int currentIdxInBuffer;
-  private ExecutorService executorService;
-  private Future<Void> fetchFuture;
-  private Object[] currentRawRow = null;
-  private boolean isBackupFilled = false;
-
-  public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
-      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
-      boolean isStreamingHandOff) {
-    this.detailRawQueryResultIterator = detailRawQueryResultIterator;
-    this.sourceSegProperties = sourceSegProperties;
-    this.destinationSegProperties = destinationSegProperties;
-    this.executorService = Executors.newFixedThreadPool(1);
-
-    if (!isStreamingHandOff) {
-      init();
-    }
-  }
-
-  private void init() {
-    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
-        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
-    try {
-      new RowsFetcher(false).call();
-      if (prefetchEnabled) {
-        this.fetchFuture = executorService.submit(new RowsFetcher(true));
-      }
-    } catch (Exception e) {
-      LOGGER.error(e, "Error occurs while fetching records");
-      throw new RuntimeException(e);
-    }
-  }
-
   /**
-   * fetch rows
+   * Counter to maintain the row counter.
    */
-  private final class RowsFetcher implements Callable<Void> {
-    private boolean isBackupFilling;
+  private int counter = 0;
 
-    private RowsFetcher(boolean isBackupFilling) {
-      this.isBackupFilling = isBackupFilling;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      if (isBackupFilling) {
-        backupBuffer = fetchRows();
-        isBackupFilled = true;
-      } else {
-        currentBuffer = fetchRows();
-      }
-      return null;
-    }
-  }
-
-  private List<Object[]> fetchRows() {
-    if (detailRawQueryResultIterator.hasNext()) {
-      return detailRawQueryResultIterator.next().getRows();
-    } else {
-      return new ArrayList<>();
-    }
-  }
-
-  private void fillDataFromPrefetch() {
-    try {
-      if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
-        if (prefetchEnabled) {
-          if (!isBackupFilled) {
-            fetchFuture.get();
-          }
-          // copy backup buffer to current buffer and fill backup buffer asyn
-          currentIdxInBuffer = 0;
-          currentBuffer = backupBuffer;
-          isBackupFilled = false;
-          fetchFuture = executorService.submit(new RowsFetcher(true));
-        } else {
-          currentIdxInBuffer = 0;
-          new RowsFetcher(false).call();
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
+  private Object[] currentConveretedRawRow = null;
 
   /**
-   * populate a row with index counter increased
+   * LOGGER
    */
-  private void popRow() {
-    fillDataFromPrefetch();
-    currentRawRow = currentBuffer.get(currentIdxInBuffer);
-    currentIdxInBuffer++;
-  }
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RawResultIterator.class.getName());
 
   /**
-   * populate a row with index counter unchanged
+   * batch of the result.
    */
-  private void pickRow() {
-    fillDataFromPrefetch();
-    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+  private RowBatch batch;
+
+  public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
+      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+    this.detailRawQueryResultIterator = detailRawQueryResultIterator;
+    this.sourceSegProperties = sourceSegProperties;
+    this.destinationSegProperties = destinationSegProperties;
   }
 
   @Override
   public boolean hasNext() {
-    fillDataFromPrefetch();
-    if (currentIdxInBuffer < currentBuffer.size()) {
+    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
+      if (detailRawQueryResultIterator.hasNext()) {
+        batch = null;
+        batch = detailRawQueryResultIterator.next();
+        counter = 0; // batch changed so reset the counter.
+      } else {
+        return false;
+      }
+    }
+    if (!checkIfBatchIsProcessedCompletely(batch)) {
       return true;
+    } else {
+      return false;
     }
-
-    return false;
   }
 
   @Override
   public Object[] next() {
+    if (null == batch) { // for 1st time
+      batch = detailRawQueryResultIterator.next();
+    }
+    if (!checkIfBatchIsProcessedCompletely(batch)) {
+      try {
+        if (null != currentConveretedRawRow) {
+          counter++;
+          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+          currentConveretedRawRow = null;
+          return currentConveretedRawRowTemp;
+        }
+        return convertRow(batch.getRawRow(counter++));
+      } catch (KeyGenException e) {
+        LOGGER.error(e.getMessage());
+        return null;
+      }
+    } else { // completed one batch.
+      batch = null;
+      batch = detailRawQueryResultIterator.next();
+      counter = 0;
+    }
     try {
-      popRow();
-      return convertRow(this.currentRawRow);
+      if (null != currentConveretedRawRow) {
+        counter++;
+        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+        currentConveretedRawRow = null;
+        return currentConveretedRawRowTemp;
+      }
+      return convertRow(batch.getRawRow(counter++));
     } catch (KeyGenException e) {
-      throw new RuntimeException(e);
+      LOGGER.error(e.getMessage());
+      return null;
     }
+
   }
 
   /**
@@ -184,22 +125,38 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
    * @return
    */
   public Object[] fetchConverted() throws KeyGenException {
-    pickRow();
-    return convertRow(this.currentRawRow);
+    if (null != currentConveretedRawRow) {
+      return currentConveretedRawRow;
+    }
+    if (hasNext()) {
+      Object[] rawRow = batch.getRawRow(counter);
+      currentConveretedRawRow = convertRow(rawRow);
+      return currentConveretedRawRow;
+    }
+    else {
+      return null;
+    }
   }
 
   private Object[] convertRow(Object[] rawRow) throws KeyGenException {
     byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
     long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
-    byte[] convertedBytes =
+    byte[] covertedBytes =
         destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray);
-    ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(convertedBytes);
+    ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes);
     return rawRow;
   }
 
-  public void close() {
-    if (null != executorService) {
-      executorService.shutdownNow();
+  /**
+   * To check if the batch is processed completely
+   * @param batch
+   * @return
+   */
+  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
+    if (counter < batch.getSize()) {
+      return false;
+    } else {
+      return true;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 001f6c0..dbf87a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -48,6 +48,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll = {
     sql("DROP TABLE IF EXISTS main")
+    sql("set carbon.search.enabled = false")
     sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
   }
 
@@ -117,6 +118,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
       sql(s"SELECT * FROM main WHERE id='100000'"))
     sql("DROP DATAMAP if exists dm ON TABLE main")
+    sql("set carbon.search.enabled = false")
   }
 
   test("test lucene datamap with search mode 2") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 1f3decc..d21197c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -75,7 +75,7 @@ class HandoffPartition(
  */
 class StreamingRawResultIterator(
     recordReader: CarbonStreamRecordReader
-) extends RawResultIterator(null, null, null, true) {
+) extends RawResultIterator(null, null, null) {
 
   override def hasNext: Boolean = {
     recordReader.nextKeyValue()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index a347313..b0711ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -132,7 +132,7 @@ public class CarbonCompactionExecutor {
         queryModel.setTableBlockInfos(list);
         resultList.add(
             new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties,
-                destinationSegProperties, false));
+                destinationSegProperties));
       }
     }
     return resultList;