You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/18 08:45:00 UTC
[2/3] carbondata git commit: Fixed batch load issue count and
synchronization
Fixed batch load issue count and synchronization
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f7015212
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f7015212
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f7015212
Branch: refs/heads/branch-1.1
Commit: f7015212d10c73c287e28640cb4545158b1ad318
Parents: a6468f7
Author: ravipesala <ra...@gmail.com>
Authored: Thu Jun 15 16:32:09 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:13:48 2017 +0530
----------------------------------------------------------------------
.../sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7015212/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c3243b6..cc7929d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -197,7 +197,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
ThreadStatusObserver threadStatusObserver) {
this.sortParameters = sortParameters.getCopy();
this.iteratorCount = new AtomicInteger(numberOfThreads);
- this.mergerQueue = new LinkedBlockingQueue<>();
+ this.mergerQueue = new LinkedBlockingQueue<>(1);
this.threadStatusObserver = threadStatusObserver;
createSortDataRows();
}
@@ -254,6 +254,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
.getThrowable() instanceof CarbonDataLoadingException) {
finalMerger.setStopProcess(true);
mergerQueue.put(finalMerger);
+ return;
}
processRowToNextStep(sortDataRow, sortParameters);
unsafeIntermediateFileMerger.finish();
@@ -270,6 +271,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
} catch (InterruptedException e) {
+ // if fails to put in queue because of interrupted exception, we can offer to free the main
+ // thread from waiting.
+ if (finalMerger != null) {
+ finalMerger.setStopProcess(true);
+ mergerQueue.offer(finalMerger);
+ }
throw new CarbonDataLoadingException(e);
}
}