You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by xuchuanyin <gi...@git.apache.org> on 2018/04/03 01:39:34 UTC

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

GitHub user xuchuanyin opened a pull request:

    https://github.com/apache/carbondata/pull/2133

    [CARBONDATA-2304][Compaction] Prefetch rowbatch during compaction

    Add a configuration to enable prefetch during compaction.
    
    During compaction, carbondata will query on the segments and retrieve a row, then it will sort the rows and produce the final carbondata file.
    
    Currently we find the poor performance in retrieving the rows, so adding prefetch for the rows will surely improve the compaction performance.
    
    In my local tests, compacting 4 segments each with 100 thousand rows costs 30s with prefetch and 50s without prefetch.
    
    In my tests in a larger cluster, compacting 6 segments each with 18GB raw data costs 45min with prefetch and 57min without prefetch.
    
    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [x] Any interfaces changed?
     `NO`
     - [x] Any backward compatibility impacted?
     `NO`
     - [x] Document update required?
    `Add a configuration, will update it later`
     - [x] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
    `Yes`
            - How it is tested? Please attach test report.
    `Tested in local and a 3-node cluster`
            - Is it a performance related change? Please attach the performance test report.
    `Compaction performance has been enhanced by 25+%`
            - Any additional information to help reviewers in testing this change.
           
     - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    `Not related`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuchuanyin/carbondata 0402_compaction_prefetch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2133
    
----
commit 41869effa326052b46088f68dd1d6ccc5f7525e5
Author: xuchuanyin <xu...@...>
Date:   2018-04-02T12:38:17Z

    Prefetch rowbatch during compaction
    
    Add a configuration to enable prefetch during compaction.

----


---

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2133#discussion_r181136505
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -39,106 +53,131 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    -
    -  /**
    -   * LOGGER
    -   */
    -  private static final LogService LOGGER =
    -      LogServiceFactory.getLogService(RawResultIterator.class.getName());
    -
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
    +      boolean isStreamingHandOff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandOff) {
    +      init();
    +    }
       }
     
    -  @Override public boolean hasNext() {
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Error occurs while fetching records");
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
     
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    +  }
     
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  private List<Object[]> fetchRows() {
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      return detailRawQueryResultIterator.next().getRows();
         } else {
    -      return false;
    +      return new ArrayList<>();
         }
       }
     
    -  @Override public Object[] next() {
    -    if (null == batch) { // for 1st time
    -      batch = detailRawQueryResultIterator.next();
    -    }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      try {
    -        if (null != currentConveretedRawRow) {
    -          counter++;
    -          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
    -          currentConveretedRawRow = null;
    -          return currentConveretedRawRowTemp;
    +  private void fillDataFromPrefetch() {
    +    try {
    +      if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
    +        if (prefetchEnabled) {
    +          if (!isBackupFilled) {
    +            fetchFuture.get();
    +          }
    +          // copy backup buffer to current buffer and fill backup buffer asyn
    +          currentIdxInBuffer = 0;
    +          currentBuffer = backupBuffer;
    +          isBackupFilled = false;
    +          fetchFuture = executorService.submit(new RowsFetcher(true));
    +        } else {
    +          currentIdxInBuffer = 0;
    +          new RowsFetcher(false).call();
             }
    -        return convertRow(batch.getRawRow(counter++));
    -      } catch (KeyGenException e) {
    -        LOGGER.error(e.getMessage());
    -        return null;
           }
    -    } else { // completed one batch.
    -      batch = null;
    -      batch = detailRawQueryResultIterator.next();
    -      counter = 0;
    +    } catch (Exception e) {
    +      throw new RuntimeException(e);
         }
    -    try {
    -      if (null != currentConveretedRawRow) {
    -        counter++;
    -        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
    -        currentConveretedRawRow = null;
    -        return currentConveretedRawRowTemp;
    -      }
    +  }
     
    -      return convertRow(batch.getRawRow(counter++));
    -    } catch (KeyGenException e) {
    -      LOGGER.error(e.getMessage());
    -      return null;
    +  private void popRow() {
    --- End diff --
    
    please add comment


---

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2133#discussion_r181135852
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1642,6 +1642,14 @@
     
       public static final String CARBON_SEARCH_MODE_THREAD_DEFAULT = "3";
     
    +  /*
    +   * whether to enable prefetch during compaction
    --- End diff --
    
    Can you describe more on what is prefetched


---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    LGTM


---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5094/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3632/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4323/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    @jackylk reviews have been fixed


---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4765/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4270/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4854/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4772/



---

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2133


---

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2133#discussion_r181136264
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -39,106 +53,131 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    -
    -  /**
    -   * LOGGER
    -   */
    -  private static final LogService LOGGER =
    -      LogServiceFactory.getLogService(RawResultIterator.class.getName());
    -
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
    +      boolean isStreamingHandOff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandOff) {
    +      init();
    +    }
       }
     
    -  @Override public boolean hasNext() {
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Error occurs while fetching records");
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
     
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    +  }
     
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  private List<Object[]> fetchRows() {
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      return detailRawQueryResultIterator.next().getRows();
         } else {
    -      return false;
    +      return new ArrayList<>();
         }
       }
     
    -  @Override public Object[] next() {
    -    if (null == batch) { // for 1st time
    -      batch = detailRawQueryResultIterator.next();
    -    }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      try {
    -        if (null != currentConveretedRawRow) {
    -          counter++;
    -          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
    -          currentConveretedRawRow = null;
    -          return currentConveretedRawRowTemp;
    +  private void fillDataFromPrefetch() {
    +    try {
    +      if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
    +        if (prefetchEnabled) {
    +          if (!isBackupFilled) {
    +            fetchFuture.get();
    +          }
    +          // copy backup buffer to current buffer and fill backup buffer asyn
    +          currentIdxInBuffer = 0;
    +          currentBuffer = backupBuffer;
    +          isBackupFilled = false;
    +          fetchFuture = executorService.submit(new RowsFetcher(true));
    +        } else {
    +          currentIdxInBuffer = 0;
    +          new RowsFetcher(false).call();
             }
    -        return convertRow(batch.getRawRow(counter++));
    -      } catch (KeyGenException e) {
    -        LOGGER.error(e.getMessage());
    -        return null;
           }
    -    } else { // completed one batch.
    -      batch = null;
    -      batch = detailRawQueryResultIterator.next();
    -      counter = 0;
    +    } catch (Exception e) {
    +      throw new RuntimeException(e);
         }
    -    try {
    -      if (null != currentConveretedRawRow) {
    -        counter++;
    -        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
    -        currentConveretedRawRow = null;
    -        return currentConveretedRawRowTemp;
    -      }
    +  }
     
    -      return convertRow(batch.getRawRow(counter++));
    -    } catch (KeyGenException e) {
    -      LOGGER.error(e.getMessage());
    -      return null;
    +  private void popRow() {
    +    fillDataFromPrefetch();
    +    currentRawRow = currentBuffer.get(currentIdxInBuffer);
    +    currentIdxInBuffer++;
    +  }
    +
    +  private void pickRow() {
    +    fillDataFromPrefetch();
    +    currentRawRow = currentBuffer.get(currentIdxInBuffer);
    +  }
    +
    +  @Override public boolean hasNext() {
    --- End diff --
    
    move @Override to previous line


---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3868/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3544/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3537/



---

[GitHub] carbondata issue #2133: [CARBONDATA-2304][Compaction] Prefetch rowbatch duri...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2133
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4275/



---