You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by xuchuanyin <gi...@git.apache.org> on 2018/11/07 12:24:08 UTC

[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...

GitHub user xuchuanyin opened a pull request:

    https://github.com/apache/carbondata/pull/2906

    [CARBONDATA-3088][Compaction] support prefetch for compaction

    Current compaction performance is low. By adding logs to observe the
    compaction procedure, we found that in
    `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait
    about 30ms before submitting a new TablePage producer. Since the method
    `addDataToStore` is called in single thread, it will result the waiting
    every 32000 records since it will collect 32000 records to form a
    TablePage.
    
    To reduce the waiting time, we can prepare the 32000 records ahead. This
    can be achived using prefetch.
    
    We will prepare two buffers, one will provide the records to the
    downstream (`addDataToStore`) and the other one will prepare the records
    asynchronously. The first is called working buffer and the second is
    called backup buffer. Once working buffer is exhausted, the two buffers
    will exchange their roles: the backup buffer will be the new working
    buffer and the old working buffer will be the new backup buffer and it
    will be filled asynchronously.
    
    Two parameters are involved for this feature:
    
    1. carbon.detail.batch.size: This is an existed parameter and the default
    value is 100. This parameter controls the batch size of records that
    return to the client. For normal query, it is OK to keep it as 100. But
    for compaction, since all the records will be operated, we suggest you
    to set it to a larger value such as 32000. (32000 is the max rows for a
    table page that the down stream wants).
    
    2. carbon.compaction.prefetch.enable: This is a new parameter and the
    default value is `false` (We may change it to `true` later). This
    parameter controls whether we will prefetch the records for compation.
    
    By using this prefetch feature, we can enhance the performance for
    compaction. More test results can be found in the PR description.
    
    
    code   branch | prefetch | carbon.detail.batch.size   (default 100) | time   taken per load (s) | time taken to   compact 3 loads (s) | Enhancement
    -- | -- | -- | -- | -- | --
    master | NA | 100 | 447.4 | 445.9 | 450.1 | 661.3 | Baseline
    master | NA | 32000 | 441.5 | 454.4 | 456.8 | 641.2 | +3.0%
    current PR | enable | 100 | 445.3 | 450.2 | 445.3 | 411.8 | +37.7%
    current PR | enable | 32000 | 438.7 | 446.8 | 441.8 | 333.1 | +49.6%
    current PR | disable | 100 | 458.1 | 459.4 | 450.9 | 659.5 | +0.3%
    current PR | disable | 32000 |   |   |   |   |  
    
    
    
    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuchuanyin/carbondata 181105_opt_prefetch_compaction

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2906
    
----
commit 4406ee68b0b12039f3ee2a1cc8dcc4469bb639d0
Author: xuchuanyin <xu...@...>
Date:   2018-11-05T07:11:09Z

    support prefetch for compaction
    
    Current compaction performance is low. By adding logs to observe the
    compaction procedure, we found that in
    `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait
    about 30ms before submitting a new TablePage producer. Since the method
    `addDataToStore` is called in single thread, it will result the waiting
    every 32000 records since it will collect 32000 records to form a
    TablePage.
    
    To reduce the waiting time, we can prepare the 32000 records ahead. This
    can be achived using prefetch.
    
    We will prepare two buffers, one will provide the records to the
    downstream (`addDataToStore`) and the other one will prepare the records
    asynchronously. The first is called working buffer and the second is
    called backup buffer. Once working buffer is exhausted, the two buffers
    will exchange their roles: the backup buffer will be the new working
    buffer and the old working buffer will be the new backup buffer and it
    will be filled asynchronously.
    
    Two parameters are involved for this feature:
    
    1. carbon.detail.batch.size: This is an existed parameter and the default
    value is 100. This parameter controls the batch size of records that
    return to the client. For normal query, it is OK to keep it as 100. But
    for compaction, since all the records will be operated, we suggest you
    to set it to a larger value such as 32000. (32000 is the max rows for a
    table page that the down stream wants).
    
    2. carbon.compaction.prefetch.enable: This is a new parameter and the
    default value is `false` (We may change it to `true` later). This
    parameter controls whether we will prefetch the records for compation.
    
    By using this prefetch feature, we can enhance the performance for
    compaction. More test results can be found in the PR description.

----


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    LGTM


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1328/



---

[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2906#discussion_r231755219
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -40,103 +49,150 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       /**
        * LOGGER
        */
       private static final Logger LOGGER =
           LogServiceFactory.getLogService(RawResultIterator.class.getName());
     
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    -
       public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
    +      boolean isStreamingHandoff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandoff) {
    +      init();
    +    }
       }
     
    -  @Override
    -  public boolean hasNext() {
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error("Error occurs while fetching records", e);
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
    +
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  }
    +
    +  private List<Object[]> fetchRows() throws Exception {
    +    List<Object[]> converted = new ArrayList<>();
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
    +        converted.add(convertRow(r));
    +      }
    +      return converted;
         } else {
    -      return false;
    +      return new ArrayList<>();
    --- End diff --
    
    fine~


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Please note that this PR is nearly the modification from PR #2133 plus that we meld the `convertRow` step to the backup buffer procedure.


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1539/



---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1325/



---

[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...

Posted by Sssan520 <gi...@git.apache.org>.
Github user Sssan520 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2906#discussion_r231738236
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -40,103 +49,150 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       /**
        * LOGGER
        */
       private static final Logger LOGGER =
           LogServiceFactory.getLogService(RawResultIterator.class.getName());
     
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    -
       public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
    +      boolean isStreamingHandoff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandoff) {
    +      init();
    +    }
       }
     
    -  @Override
    -  public boolean hasNext() {
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error("Error occurs while fetching records", e);
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
    +
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  }
    +
    +  private List<Object[]> fetchRows() throws Exception {
    +    List<Object[]> converted = new ArrayList<>();
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
    +        converted.add(convertRow(r));
    +      }
    +      return converted;
         } else {
    -      return false;
    +      return new ArrayList<>();
    --- End diff --
    
    since object converted has been initialized, if it has no data to add, can return it.


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1537/



---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9584/



---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1536/



---

[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2906#discussion_r231492037
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -40,103 +49,150 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       /**
        * LOGGER
        */
       private static final Logger LOGGER =
           LogServiceFactory.getLogService(RawResultIterator.class.getName());
     
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    -
       public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
    +      boolean isStreamingHandoff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandoff) {
    +      init();
    +    }
       }
     
    -  @Override
    -  public boolean hasNext() {
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error("Error occurs while fetching records", e);
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
    +
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  }
    +
    +  private List<Object[]> fetchRows() throws Exception {
    +    List<Object[]> converted = new ArrayList<>();
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
    +        converted.add(convertRow(r));
    --- End diff --
    
    FYI: This is the key difference with PR #2133 


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1326/



---

[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2906


---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9585/



---

[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2906
  
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9587/



---