You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/08 06:31:05 UTC
carbondata git commit: [HOTFIX] Modified code to fix the degrade in
compaction performance
Repository: carbondata
Updated Branches:
refs/heads/master 01115872e -> 45b8d18b7
[HOTFIX] Modified code to fix the degrade in compaction performance
Problem
Compaction performance for 3.5 billion degraded by 16-20%
Analysis:
Code modification in RawResultIerator.java has caused the problem wherein few extra checks are performed as compared to previous code
Fix
Revert the changes in RawResultIerator class
This closes #2613
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/45b8d18b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/45b8d18b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/45b8d18b
Branch: refs/heads/master
Commit: 45b8d18b7ff416e75d474b001cfed257bb0f96ad
Parents: 0111587
Author: m00258959 <ma...@huawei.com>
Authored: Tue Aug 7 13:24:16 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Aug 8 12:00:52 2018 +0530
----------------------------------------------------------------------
.../scan/result/iterator/RawResultIterator.java | 205 ++++++++-----------
.../detailquery/SearchModeTestCase.scala | 2 +
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +-
.../merger/CarbonCompactionExecutor.java | 2 +-
4 files changed, 85 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/45b8d18b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 94cea91..efa5b8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -16,22 +16,13 @@
*/
package org.apache.carbondata.core.scan.result.iterator;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.CarbonProperties;
/**
* This is a wrapper iterator over the detail raw query iterator.
@@ -39,11 +30,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
* This will handle the batch results and will iterate on the batches and give single row.
*/
public class RawResultIterator extends CarbonIterator<Object[]> {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RawResultIterator.class.getName());
private final SegmentProperties sourceSegProperties;
@@ -53,130 +39,85 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- private boolean prefetchEnabled;
- private List<Object[]> currentBuffer;
- private List<Object[]> backupBuffer;
- private int currentIdxInBuffer;
- private ExecutorService executorService;
- private Future<Void> fetchFuture;
- private Object[] currentRawRow = null;
- private boolean isBackupFilled = false;
-
- public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
- boolean isStreamingHandOff) {
- this.detailRawQueryResultIterator = detailRawQueryResultIterator;
- this.sourceSegProperties = sourceSegProperties;
- this.destinationSegProperties = destinationSegProperties;
- this.executorService = Executors.newFixedThreadPool(1);
-
- if (!isStreamingHandOff) {
- init();
- }
- }
-
- private void init() {
- this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
- CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
- try {
- new RowsFetcher(false).call();
- if (prefetchEnabled) {
- this.fetchFuture = executorService.submit(new RowsFetcher(true));
- }
- } catch (Exception e) {
- LOGGER.error(e, "Error occurs while fetching records");
- throw new RuntimeException(e);
- }
- }
-
/**
- * fetch rows
+ * Counter to maintain the row counter.
*/
- private final class RowsFetcher implements Callable<Void> {
- private boolean isBackupFilling;
+ private int counter = 0;
- private RowsFetcher(boolean isBackupFilling) {
- this.isBackupFilling = isBackupFilling;
- }
-
- @Override
- public Void call() throws Exception {
- if (isBackupFilling) {
- backupBuffer = fetchRows();
- isBackupFilled = true;
- } else {
- currentBuffer = fetchRows();
- }
- return null;
- }
- }
-
- private List<Object[]> fetchRows() {
- if (detailRawQueryResultIterator.hasNext()) {
- return detailRawQueryResultIterator.next().getRows();
- } else {
- return new ArrayList<>();
- }
- }
-
- private void fillDataFromPrefetch() {
- try {
- if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
- if (prefetchEnabled) {
- if (!isBackupFilled) {
- fetchFuture.get();
- }
- // copy backup buffer to current buffer and fill backup buffer asyn
- currentIdxInBuffer = 0;
- currentBuffer = backupBuffer;
- isBackupFilled = false;
- fetchFuture = executorService.submit(new RowsFetcher(true));
- } else {
- currentIdxInBuffer = 0;
- new RowsFetcher(false).call();
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ private Object[] currentConveretedRawRow = null;
/**
- * populate a row with index counter increased
+ * LOGGER
*/
- private void popRow() {
- fillDataFromPrefetch();
- currentRawRow = currentBuffer.get(currentIdxInBuffer);
- currentIdxInBuffer++;
- }
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RawResultIterator.class.getName());
/**
- * populate a row with index counter unchanged
+ * batch of the result.
*/
- private void pickRow() {
- fillDataFromPrefetch();
- currentRawRow = currentBuffer.get(currentIdxInBuffer);
+ private RowBatch batch;
+
+ public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
+ SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+ this.detailRawQueryResultIterator = detailRawQueryResultIterator;
+ this.sourceSegProperties = sourceSegProperties;
+ this.destinationSegProperties = destinationSegProperties;
}
@Override
public boolean hasNext() {
- fillDataFromPrefetch();
- if (currentIdxInBuffer < currentBuffer.size()) {
+ if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
+ if (detailRawQueryResultIterator.hasNext()) {
+ batch = null;
+ batch = detailRawQueryResultIterator.next();
+ counter = 0; // batch changed so reset the counter.
+ } else {
+ return false;
+ }
+ }
+ if (!checkIfBatchIsProcessedCompletely(batch)) {
return true;
+ } else {
+ return false;
}
-
- return false;
}
@Override
public Object[] next() {
+ if (null == batch) { // for 1st time
+ batch = detailRawQueryResultIterator.next();
+ }
+ if (!checkIfBatchIsProcessedCompletely(batch)) {
+ try {
+ if (null != currentConveretedRawRow) {
+ counter++;
+ Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+ currentConveretedRawRow = null;
+ return currentConveretedRawRowTemp;
+ }
+ return convertRow(batch.getRawRow(counter++));
+ } catch (KeyGenException e) {
+ LOGGER.error(e.getMessage());
+ return null;
+ }
+ } else { // completed one batch.
+ batch = null;
+ batch = detailRawQueryResultIterator.next();
+ counter = 0;
+ }
try {
- popRow();
- return convertRow(this.currentRawRow);
+ if (null != currentConveretedRawRow) {
+ counter++;
+ Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+ currentConveretedRawRow = null;
+ return currentConveretedRawRowTemp;
+ }
+ return convertRow(batch.getRawRow(counter++));
} catch (KeyGenException e) {
- throw new RuntimeException(e);
+ LOGGER.error(e.getMessage());
+ return null;
}
+
}
/**
@@ -184,22 +125,38 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
* @return
*/
public Object[] fetchConverted() throws KeyGenException {
- pickRow();
- return convertRow(this.currentRawRow);
+ if (null != currentConveretedRawRow) {
+ return currentConveretedRawRow;
+ }
+ if (hasNext()) {
+ Object[] rawRow = batch.getRawRow(counter);
+ currentConveretedRawRow = convertRow(rawRow);
+ return currentConveretedRawRow;
+ }
+ else {
+ return null;
+ }
}
private Object[] convertRow(Object[] rawRow) throws KeyGenException {
byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
- byte[] convertedBytes =
+ byte[] covertedBytes =
destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray);
- ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(convertedBytes);
+ ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes);
return rawRow;
}
- public void close() {
- if (null != executorService) {
- executorService.shutdownNow();
+ /**
+ * To check if the batch is processed completely
+ * @param batch
+ * @return
+ */
+ private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
+ if (counter < batch.getSize()) {
+ return false;
+ } else {
+ return true;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/45b8d18b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 001f6c0..dbf87a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -48,6 +48,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
override def afterAll = {
sql("DROP TABLE IF EXISTS main")
+ sql("set carbon.search.enabled = false")
sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
}
@@ -117,6 +118,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
sql(s"SELECT * FROM main WHERE id='100000'"))
sql("DROP DATAMAP if exists dm ON TABLE main")
+ sql("set carbon.search.enabled = false")
}
test("test lucene datamap with search mode 2") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/45b8d18b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 1f3decc..d21197c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -75,7 +75,7 @@ class HandoffPartition(
*/
class StreamingRawResultIterator(
recordReader: CarbonStreamRecordReader
-) extends RawResultIterator(null, null, null, true) {
+) extends RawResultIterator(null, null, null) {
override def hasNext: Boolean = {
recordReader.nextKeyValue()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/45b8d18b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index a347313..b0711ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -132,7 +132,7 @@ public class CarbonCompactionExecutor {
queryModel.setTableBlockInfos(list);
resultList.add(
new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties,
- destinationSegProperties, false));
+ destinationSegProperties));
}
}
return resultList;