You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/24 03:06:22 UTC
[1/2] carbondata git commit: [CARBONDATA-1223] Fixing empty file
creation in batch sort loading
Repository: carbondata
Updated Branches:
refs/heads/master 30ef14e0d -> 4a6f57ebf
[CARBONDATA-1223] Fixing empty file creation in batch sort loading
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0205fa69
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0205fa69
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0205fa69
Branch: refs/heads/master
Commit: 0205fa6991e2b1d3f2a807121d15a6eeb9f07714
Parents: 30ef14e
Author: dhatchayani <dh...@gmail.com>
Authored: Fri Jun 23 19:24:47 2017 +0530
Committer: dhatchayani <dh...@gmail.com>
Committed: Fri Jun 23 19:24:51 2017 +0530
----------------------------------------------------------------------
.../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++---
.../UnsafeSingleThreadFinalSortFilesMerger.java | 26 --------------------
.../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------
3 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 20a560d..a8d1eef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -155,7 +155,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
rowCounter.getAndAdd(i);
if (!sortDataRows.getSortDataRow().canAdd()) {
- sortDataRows.finish();
+ sortDataRows.finish(false);
sortDataRows.createSortDataRows();
}
}
@@ -246,7 +246,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
return sortDataRow;
}
- public void finish() {
+ public void finish(boolean isFinalAttempt) {
try {
// if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
// then set stop process to true in the finalmerger instance
@@ -254,6 +254,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
&& threadStatusObserver.getThrowable() != null && threadStatusObserver
.getThrowable() instanceof CarbonDataLoadingException) {
finalMerger.setStopProcess(true);
+ if (isFinalAttempt) {
+ iteratorCount.decrementAndGet();
+ }
mergerQueue.put(finalMerger);
return;
}
@@ -263,6 +266,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
unsafeIntermediateFileMerger.getMergedPages());
unsafeIntermediateFileMerger.close();
+ if (isFinalAttempt) {
+ iteratorCount.decrementAndGet();
+ }
mergerQueue.put(finalMerger);
sortDataRow = null;
unsafeIntermediateFileMerger = null;
@@ -284,8 +290,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
public void finishThread() {
synchronized (lock) {
- if (iteratorCount.decrementAndGet() <= 0) {
- finish();
+ if (iteratorCount.get() <= 1) {
+ finish(true);
+ } else {
+ iteratorCount.decrementAndGet();
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index acb976f..eb7af47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -55,25 +55,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private SortParameters parameters;
/**
- * number of measures
- */
- private int measureCount;
-
- /**
- * number of dimensionCount
- */
- private int dimensionCount;
-
- /**
- * number of complexDimensionCount
- */
- private int noDictionaryCount;
-
- private int complexDimensionCount;
-
- private boolean[] isNoDictionaryDimensionColumn;
-
- /**
* tempFileLocation
*/
private String tempFileLocation;
@@ -85,13 +66,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
String tempFileLocation) {
this.parameters = parameters;
- // set measure and dimension count
- this.measureCount = parameters.getMeasureColCount();
- this.dimensionCount = parameters.getDimColCount();
- this.complexDimensionCount = parameters.getComplexDimColCount();
-
- this.noDictionaryCount = parameters.getNoDictionaryCount();
- this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index d58835c..46c1020 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -82,13 +82,16 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
int k = 0;
while (iterator.hasNext()) {
CarbonRowBatch next = iterator.next();
- CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
- CarbonFactHandler dataHandler = CarbonFactHandlerFactory
- .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
- dataHandler.initialise();
- processBatch(next, dataHandler);
- finish(tableName, dataHandler);
+ // If no rows from merge sorter, then don't create a file in fact column handler
+ if (next.hasNext()) {
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+ CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+ .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+ dataHandler.initialise();
+ processBatch(next, dataHandler);
+ finish(tableName, dataHandler);
+ }
}
i++;
}
@@ -137,6 +140,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
dataHandler.addDataToStore(row);
batchSize++;
}
+ batch.close();
rowCounter.getAndAdd(batchSize);
}
[2/2] carbondata git commit: [CARBONDATA-1223] Fixing empty file
creation in batch sort loading This closes #1087
Posted by ra...@apache.org.
[CARBONDATA-1223] Fixing empty file creation in batch sort loading This closes #1087
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a6f57eb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a6f57eb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a6f57eb
Branch: refs/heads/master
Commit: 4a6f57ebfdc64d04d8b2a5df9aea0dc340a30ba0
Parents: 30ef14e 0205fa6
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 24 08:35:59 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Jun 24 08:35:59 2017 +0530
----------------------------------------------------------------------
.../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++---
.../UnsafeSingleThreadFinalSortFilesMerger.java | 26 --------------------
.../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------
3 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------