You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:36 UTC

[30/56] [abbrv] carbondata git commit: [CARBONDATA-1177]Fixed batch sort synchronization issue

[CARBONDATA-1177]Fixed batch sort synchronization issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/00e0f7e7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/00e0f7e7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/00e0f7e7

Branch: refs/heads/streaming_ingest
Commit: 00e0f7e733f944ef8e3b81cec619a7be732a3f86
Parents: bce15da
Author: dhatchayani <dh...@gmail.com>
Authored: Thu Jun 15 10:03:08 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 17:41:27 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 36 ++++++++++++++++----
 1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/00e0f7e7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 305a2ee..eebaa67 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.processing.newflow.sort.impl;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -45,6 +47,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeInterme
 import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * It parallely reads data from array of iterates and do merge sort.
@@ -185,11 +188,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private AtomicInteger iteratorCount;
 
+    private int batchCount;
+
     private ThreadStatusObserver threadStatusObserver;
 
+    private final Object lock = new Object();
+
     public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
         ThreadStatusObserver threadStatusObserver) {
-      this.sortParameters = sortParameters;
+      this.sortParameters = sortParameters.getCopy();
       this.iteratorCount = new AtomicInteger(numberOfThreads);
       this.mergerQueue = new LinkedBlockingQueue<>();
       this.threadStatusObserver = threadStatusObserver;
@@ -198,6 +205,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private void createSortDataRows() {
       int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      setTempLocation(sortParameters);
       this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
           sortParameters.getTempFileLocation());
       unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
@@ -209,6 +217,16 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       } catch (MemoryException e) {
         throw new CarbonDataLoadingException(e);
       }
+      batchCount++;
+    }
+
+    private void setTempLocation(SortParameters parameters) {
+      String carbonDataDirectoryPath = CarbonDataProcessorUtil
+          .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
+            parameters.getSegmentId(), false);
+      parameters.setTempFileLocation(
+          carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     }
 
     @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
@@ -236,7 +254,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             && threadStatusObserver.getThrowable() != null && threadStatusObserver
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
-          mergerQueue.offer(finalMerger);
+          mergerQueue.put(finalMerger);
         }
         processRowToNextStep(sortDataRow, sortParameters);
         unsafeIntermediateFileMerger.finish();
@@ -244,7 +262,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
             unsafeIntermediateFileMerger.getMergedPages());
         unsafeIntermediateFileMerger.close();
-        mergerQueue.offer(finalMerger);
+        mergerQueue.put(finalMerger);
         sortDataRow = null;
         unsafeIntermediateFileMerger = null;
         finalMerger = null;
@@ -252,16 +270,20 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         throw new CarbonDataLoadingException(e);
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new CarbonDataLoadingException(e);
+      } catch (InterruptedException e) {
+        throw new CarbonDataLoadingException(e);
       }
     }
 
-    public synchronized void finishThread() {
-      if (iteratorCount.decrementAndGet() <= 0) {
-        finish();
+    public void finishThread() {
+      synchronized (lock) {
+        if (iteratorCount.decrementAndGet() <= 0) {
+          finish();
+        }
       }
     }
 
-    public synchronized boolean hasNext() {
+    public boolean hasNext() {
       return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
     }