You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/18 08:45:00 UTC

[2/3] carbondata git commit: Fixed batch load issue count and synchronization

Fixed batch load issue count and synchronization


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f7015212
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f7015212
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f7015212

Branch: refs/heads/branch-1.1
Commit: f7015212d10c73c287e28640cb4545158b1ad318
Parents: a6468f7
Author: ravipesala <ra...@gmail.com>
Authored: Thu Jun 15 16:32:09 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:13:48 2017 +0530

----------------------------------------------------------------------
 .../sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7015212/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c3243b6..cc7929d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -197,7 +197,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         ThreadStatusObserver threadStatusObserver) {
       this.sortParameters = sortParameters.getCopy();
       this.iteratorCount = new AtomicInteger(numberOfThreads);
-      this.mergerQueue = new LinkedBlockingQueue<>();
+      this.mergerQueue = new LinkedBlockingQueue<>(1);
       this.threadStatusObserver = threadStatusObserver;
       createSortDataRows();
     }
@@ -254,6 +254,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
           mergerQueue.put(finalMerger);
+          return;
         }
         processRowToNextStep(sortDataRow, sortParameters);
         unsafeIntermediateFileMerger.finish();
@@ -270,6 +271,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new CarbonDataLoadingException(e);
       } catch (InterruptedException e) {
+        // if fails to put in queue because of interrupted exception, we can offer to free the main
+        // thread from waiting.
+        if (finalMerger != null) {
+          finalMerger.setStopProcess(true);
+          mergerQueue.offer(finalMerger);
+        }
         throw new CarbonDataLoadingException(e);
       }
     }