You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by xuchuanyin <gi...@git.apache.org> on 2018/11/07 12:24:08 UTC
[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...
GitHub user xuchuanyin opened a pull request:
https://github.com/apache/carbondata/pull/2906
[CARBONDATA-3088][Compaction] support prefetch for compaction
Current compaction performance is low. By adding logs to observe the
compaction procedure, we found that in
`CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait
about 30ms before submitting a new TablePage producer. Since the method
`addDataToStore` is called in single thread, it will result the waiting
every 32000 records since it will collect 32000 records to form a
TablePage.
To reduce the waiting time, we can prepare the 32000 records ahead. This
can be achived using prefetch.
We will prepare two buffers, one will provide the records to the
downstream (`addDataToStore`) and the other one will prepare the records
asynchronously. The first is called working buffer and the second is
called backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working
buffer and the old working buffer will be the new backup buffer and it
will be filled asynchronously.
Two parameters are involved for this feature:
1. carbon.detail.batch.size: This is an existed parameter and the default
value is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But
for compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a
table page that the down stream wants).
2. carbon.compaction.prefetch.enable: This is a new parameter and the
default value is `false` (We may change it to `true` later). This
parameter controls whether we will prefetch the records for compation.
By using this prefetch feature, we can enhance the performance for
compaction. More test results can be found in the PR description.
code branch | prefetch | carbon.detail.batch.size (default 100) | time taken per load (s) | time taken to compact 3 loads (s) | Enhancement
-- | -- | -- | -- | -- | --
master | NA | 100 | 447.4 | 445.9 | 450.1 | 661.3 | Baseline
master | NA | 32000 | 441.5 | 454.4 | 456.8 | 641.2 | +3.0%
current PR | enable | 100 | 445.3 | 450.2 | 445.3 | 411.8 | +37.7%
current PR | enable | 32000 | 438.7 | 446.8 | 441.8 | 333.1 | +49.6%
current PR | disable | 100 | 458.1 | 459.4 | 450.9 | 659.5 | +0.3%
current PR | disable | 32000 | | | | |
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
- [ ] Any interfaces changed?
- [ ] Any backward compatibility impacted?
- [ ] Document update required?
- [ ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/xuchuanyin/carbondata 181105_opt_prefetch_compaction
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/2906.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2906
----
commit 4406ee68b0b12039f3ee2a1cc8dcc4469bb639d0
Author: xuchuanyin <xu...@...>
Date: 2018-11-05T07:11:09Z
support prefetch for compaction
Current compaction performance is low. By adding logs to observe the
compaction procedure, we found that in
`CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait
about 30ms before submitting a new TablePage producer. Since the method
`addDataToStore` is called in single thread, it will result the waiting
every 32000 records since it will collect 32000 records to form a
TablePage.
To reduce the waiting time, we can prepare the 32000 records ahead. This
can be achived using prefetch.
We will prepare two buffers, one will provide the records to the
downstream (`addDataToStore`) and the other one will prepare the records
asynchronously. The first is called working buffer and the second is
called backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working
buffer and the old working buffer will be the new backup buffer and it
will be filled asynchronously.
Two parameters are involved for this feature:
1. carbon.detail.batch.size: This is an existed parameter and the default
value is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But
for compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a
table page that the down stream wants).
2. carbon.compaction.prefetch.enable: This is a new parameter and the
default value is `false` (We may change it to `true` later). This
parameter controls whether we will prefetch the records for compation.
By using this prefetch feature, we can enhance the performance for
compaction. More test results can be found in the PR description.
----
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2906
LGTM
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1328/
---
[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...
Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231755219
--- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
@@ -40,103 +49,150 @@
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- /**
- * Counter to maintain the row counter.
- */
- private int counter = 0;
-
- private Object[] currentConveretedRawRow = null;
+ private boolean prefetchEnabled;
+ private List<Object[]> currentBuffer;
+ private List<Object[]> backupBuffer;
+ private int currentIdxInBuffer;
+ private ExecutorService executorService;
+ private Future<Void> fetchFuture;
+ private Object[] currentRawRow = null;
+ private boolean isBackupFilled = false;
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(RawResultIterator.class.getName());
- /**
- * batch of the result.
- */
- private RowBatch batch;
-
public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+ SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
+ boolean isStreamingHandoff) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
+ this.executorService = Executors.newFixedThreadPool(1);
+
+ if (!isStreamingHandoff) {
+ init();
+ }
}
- @Override
- public boolean hasNext() {
- if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
- if (detailRawQueryResultIterator.hasNext()) {
- batch = null;
- batch = detailRawQueryResultIterator.next();
- counter = 0; // batch changed so reset the counter.
+ private void init() {
+ this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+ try {
+ new RowsFetcher(false).call();
+ if (prefetchEnabled) {
+ this.fetchFuture = executorService.submit(new RowsFetcher(true));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error occurs while fetching records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * fetch rows
+ */
+ private final class RowsFetcher implements Callable<Void> {
+ private boolean isBackupFilling;
+
+ private RowsFetcher(boolean isBackupFilling) {
+ this.isBackupFilling = isBackupFilling;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (isBackupFilling) {
+ backupBuffer = fetchRows();
+ isBackupFilled = true;
} else {
- return false;
+ currentBuffer = fetchRows();
}
+ return null;
}
- if (!checkIfBatchIsProcessedCompletely(batch)) {
- return true;
+ }
+
+ private List<Object[]> fetchRows() throws Exception {
+ List<Object[]> converted = new ArrayList<>();
+ if (detailRawQueryResultIterator.hasNext()) {
+ for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+ converted.add(convertRow(r));
+ }
+ return converted;
} else {
- return false;
+ return new ArrayList<>();
--- End diff --
fine~
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2906
Please note that this PR is nearly the modification from PR #2133 plus that we meld the `convertRow` step to the backup buffer procedure.
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1539/
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1325/
---
[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...
Posted by Sssan520 <gi...@git.apache.org>.
Github user Sssan520 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231738236
--- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
@@ -40,103 +49,150 @@
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- /**
- * Counter to maintain the row counter.
- */
- private int counter = 0;
-
- private Object[] currentConveretedRawRow = null;
+ private boolean prefetchEnabled;
+ private List<Object[]> currentBuffer;
+ private List<Object[]> backupBuffer;
+ private int currentIdxInBuffer;
+ private ExecutorService executorService;
+ private Future<Void> fetchFuture;
+ private Object[] currentRawRow = null;
+ private boolean isBackupFilled = false;
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(RawResultIterator.class.getName());
- /**
- * batch of the result.
- */
- private RowBatch batch;
-
public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+ SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
+ boolean isStreamingHandoff) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
+ this.executorService = Executors.newFixedThreadPool(1);
+
+ if (!isStreamingHandoff) {
+ init();
+ }
}
- @Override
- public boolean hasNext() {
- if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
- if (detailRawQueryResultIterator.hasNext()) {
- batch = null;
- batch = detailRawQueryResultIterator.next();
- counter = 0; // batch changed so reset the counter.
+ private void init() {
+ this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+ try {
+ new RowsFetcher(false).call();
+ if (prefetchEnabled) {
+ this.fetchFuture = executorService.submit(new RowsFetcher(true));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error occurs while fetching records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * fetch rows
+ */
+ private final class RowsFetcher implements Callable<Void> {
+ private boolean isBackupFilling;
+
+ private RowsFetcher(boolean isBackupFilling) {
+ this.isBackupFilling = isBackupFilling;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (isBackupFilling) {
+ backupBuffer = fetchRows();
+ isBackupFilled = true;
} else {
- return false;
+ currentBuffer = fetchRows();
}
+ return null;
}
- if (!checkIfBatchIsProcessedCompletely(batch)) {
- return true;
+ }
+
+ private List<Object[]> fetchRows() throws Exception {
+ List<Object[]> converted = new ArrayList<>();
+ if (detailRawQueryResultIterator.hasNext()) {
+ for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+ converted.add(convertRow(r));
+ }
+ return converted;
} else {
- return false;
+ return new ArrayList<>();
--- End diff --
since object converted has been initialized, if it has no data to add, can return it.
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1537/
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9584/
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1536/
---
[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...
Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231492037
--- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
@@ -40,103 +49,150 @@
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- /**
- * Counter to maintain the row counter.
- */
- private int counter = 0;
-
- private Object[] currentConveretedRawRow = null;
+ private boolean prefetchEnabled;
+ private List<Object[]> currentBuffer;
+ private List<Object[]> backupBuffer;
+ private int currentIdxInBuffer;
+ private ExecutorService executorService;
+ private Future<Void> fetchFuture;
+ private Object[] currentRawRow = null;
+ private boolean isBackupFilled = false;
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(RawResultIterator.class.getName());
- /**
- * batch of the result.
- */
- private RowBatch batch;
-
public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+ SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
+ boolean isStreamingHandoff) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
+ this.executorService = Executors.newFixedThreadPool(1);
+
+ if (!isStreamingHandoff) {
+ init();
+ }
}
- @Override
- public boolean hasNext() {
- if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
- if (detailRawQueryResultIterator.hasNext()) {
- batch = null;
- batch = detailRawQueryResultIterator.next();
- counter = 0; // batch changed so reset the counter.
+ private void init() {
+ this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+ try {
+ new RowsFetcher(false).call();
+ if (prefetchEnabled) {
+ this.fetchFuture = executorService.submit(new RowsFetcher(true));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error occurs while fetching records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * fetch rows
+ */
+ private final class RowsFetcher implements Callable<Void> {
+ private boolean isBackupFilling;
+
+ private RowsFetcher(boolean isBackupFilling) {
+ this.isBackupFilling = isBackupFilling;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (isBackupFilling) {
+ backupBuffer = fetchRows();
+ isBackupFilled = true;
} else {
- return false;
+ currentBuffer = fetchRows();
}
+ return null;
}
- if (!checkIfBatchIsProcessedCompletely(batch)) {
- return true;
+ }
+
+ private List<Object[]> fetchRows() throws Exception {
+ List<Object[]> converted = new ArrayList<>();
+ if (detailRawQueryResultIterator.hasNext()) {
+ for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+ converted.add(convertRow(r));
--- End diff --
FYI: This is the key difference with PR #2133
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1326/
---
[GitHub] carbondata pull request #2906: [CARBONDATA-3088][Compaction] support prefetc...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/2906
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9585/
---
[GitHub] carbondata issue #2906: [CARBONDATA-3088][Compaction] support prefetch for c...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9587/
---