You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:59 UTC

[32/50] [abbrv] carbondata git commit: [CARBONDATA-3088][Compaction] support prefetch for compaction

[CARBONDATA-3088][Compaction] support prefetch for compaction

Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in
`CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method
`addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage.

To reduce the waiting time, we can prepare the 32000 records ahead. This an be achived using prefetch.

We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records
asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it
will be filled asynchronously.

Two parameters are involved for this feature:

1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants).

2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This
parameter controls whether we will prefetch the records for compation.

By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description.

This closes #2906


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fedba410
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fedba410
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fedba410

Branch: refs/heads/branch-1.5
Commit: fedba410d8b389ce97c03e1f715a916949275f04
Parents: 415635e
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Mon Nov 5 15:11:09 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 21 22:43:46 2018 +0530

----------------------------------------------------------------------
 .../scan/result/iterator/RawResultIterator.java | 199 ++++++++++++-------
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   2 +-
 .../merger/CarbonCompactionExecutor.java        |   2 +-
 3 files changed, 125 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fedba410/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 29d8751..1febb0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -16,12 +16,21 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.log4j.Logger;
 
@@ -40,12 +49,14 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
    */
   private CarbonIterator<RowBatch> detailRawQueryResultIterator;
 
-  /**
-   * Counter to maintain the row counter.
-   */
-  private int counter = 0;
-
-  private Object[] currentConveretedRawRow = null;
+  private boolean prefetchEnabled;
+  private List<Object[]> currentBuffer;
+  private List<Object[]> backupBuffer;
+  private int currentIdxInBuffer;
+  private ExecutorService executorService;
+  private Future<Void> fetchFuture;
+  private Object[] currentRawRow = null;
+  private boolean isBackupFilled = false;
 
   /**
    * LOGGER
@@ -53,72 +64,124 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(RawResultIterator.class.getName());
 
-  /**
-   * batch of the result.
-   */
-  private RowBatch batch;
-
   public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
-      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
+      boolean isStreamingHandoff) {
     this.detailRawQueryResultIterator = detailRawQueryResultIterator;
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
+    this.executorService = Executors.newFixedThreadPool(1);
+
+    if (!isStreamingHandoff) {
+      init();
+    }
   }
 
-  @Override
-  public boolean hasNext() {
-    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
-      if (detailRawQueryResultIterator.hasNext()) {
-        batch = null;
-        batch = detailRawQueryResultIterator.next();
-        counter = 0; // batch changed so reset the counter.
-      } else {
-        return false;
+  private void init() {
+    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+    try {
+      new RowsFetcher(false).call();
+      if (prefetchEnabled) {
+        this.fetchFuture = executorService.submit(new RowsFetcher(true));
       }
-    }
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      return true;
-    } else {
-      return false;
+    } catch (Exception e) {
+      LOGGER.error("Error occurs while fetching records", e);
+      throw new RuntimeException(e);
     }
   }
 
-  @Override
-  public Object[] next() {
-    if (null == batch) { // for 1st time
-      batch = detailRawQueryResultIterator.next();
+  /**
+   * fetch rows
+   */
+  private final class RowsFetcher implements Callable<Void> {
+    private boolean isBackupFilling;
+
+    private RowsFetcher(boolean isBackupFilling) {
+      this.isBackupFilling = isBackupFilling;
     }
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      try {
-        if (null != currentConveretedRawRow) {
-          counter++;
-          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-          currentConveretedRawRow = null;
-          return currentConveretedRawRowTemp;
-        }
-        return convertRow(batch.getRawRow(counter++));
-      } catch (KeyGenException e) {
-        LOGGER.error(e.getMessage());
-        return null;
+
+    @Override
+    public Void call() throws Exception {
+      if (isBackupFilling) {
+        backupBuffer = fetchRows();
+        isBackupFilled = true;
+      } else {
+        currentBuffer = fetchRows();
       }
-    } else { // completed one batch.
-      batch = null;
-      batch = detailRawQueryResultIterator.next();
-      counter = 0;
+      return null;
     }
+  }
+
+  private List<Object[]> fetchRows() throws Exception {
+    List<Object[]> converted = new ArrayList<>();
+    if (detailRawQueryResultIterator.hasNext()) {
+      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+        converted.add(convertRow(r));
+      }
+    }
+    return converted;
+  }
+
+  private void fillDataFromPrefetch() {
     try {
-      if (null != currentConveretedRawRow) {
-        counter++;
-        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-        currentConveretedRawRow = null;
-        return currentConveretedRawRowTemp;
+      if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
+        if (prefetchEnabled) {
+          if (!isBackupFilled) {
+            fetchFuture.get();
+          }
+          // copy backup buffer to current buffer and fill backup buffer asyn
+          currentIdxInBuffer = 0;
+          currentBuffer.clear();
+          currentBuffer = backupBuffer;
+          isBackupFilled = false;
+          fetchFuture = executorService.submit(new RowsFetcher(true));
+        } else {
+          currentIdxInBuffer = 0;
+          new RowsFetcher(false).call();
+        }
       }
-      return convertRow(batch.getRawRow(counter++));
-    } catch (KeyGenException e) {
-      LOGGER.error(e.getMessage());
-      return null;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * populate a row with index counter increased
+   */
+  private void popRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+    currentIdxInBuffer++;
+  }
+
+  /**
+   * populate a row with index counter unchanged
+   */
+  private void pickRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+  }
+
+  @Override
+  public boolean hasNext() {
+    fillDataFromPrefetch();
+    if (currentIdxInBuffer < currentBuffer.size()) {
+      return true;
     }
 
+    return false;
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      popRow();
+      return this.currentRawRow;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -126,17 +189,8 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
    * @return
    */
   public Object[] fetchConverted() throws KeyGenException {
-    if (null != currentConveretedRawRow) {
-      return currentConveretedRawRow;
-    }
-    if (hasNext()) {
-      Object[] rawRow = batch.getRawRow(counter);
-      currentConveretedRawRow = convertRow(rawRow);
-      return currentConveretedRawRow;
-    }
-    else {
-      return null;
-    }
+    pickRow();
+    return this.currentRawRow;
   }
 
   private Object[] convertRow(Object[] rawRow) throws KeyGenException {
@@ -148,16 +202,9 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
     return rawRow;
   }
 
-  /**
-   * To check if the batch is processed completely
-   * @param batch
-   * @return
-   */
-  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
-    if (counter < batch.getSize()) {
-      return false;
-    } else {
-      return true;
+  public void close() {
+    if (null != executorService) {
+      executorService.shutdownNow();
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fedba410/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 606aa01..c7c5bdc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -74,7 +74,7 @@ class HandoffPartition(
  */
 class StreamingRawResultIterator(
     recordReader: RecordReader[Void, Any]
-) extends RawResultIterator(null, null, null) {
+) extends RawResultIterator(null, null, null, true) {
 
   override def hasNext: Boolean = {
     recordReader.nextKeyValue()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fedba410/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 7bc7ae1..ea123d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -136,7 +136,7 @@ public class CarbonCompactionExecutor {
         resultList.add(
             new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
                 sourceSegProperties,
-                destinationSegProperties));
+                destinationSegProperties, false));
       }
     }
     return resultList;