You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/06 14:41:58 UTC
[33/50] [abbrv] carbondata git commit: [CARBONDATA-1177]Fixed batch
sort synchronization issue
[CARBONDATA-1177]Fixed batch sort synchronization issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6468f73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6468f73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6468f73
Branch: refs/heads/branch-1.1
Commit: a6468f73bf74a2afb0a4d2c97664e127f91d69bd
Parents: c05523d
Author: dhatchayani <dh...@gmail.com>
Authored: Thu Jun 15 10:03:08 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:13:37 2017 +0530
----------------------------------------------------------------------
.../UnsafeBatchParallelReadMergeSorterImpl.java | 36 ++++++++++++++++----
1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6468f73/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index f1b4a80..c3243b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.processing.newflow.sort.impl;
+import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -44,6 +46,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleT
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
/**
* It parallely reads data from array of iterates and do merge sort.
@@ -184,11 +187,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
private AtomicInteger iteratorCount;
+ private int batchCount;
+
private ThreadStatusObserver threadStatusObserver;
+ private final Object lock = new Object();
+
public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
ThreadStatusObserver threadStatusObserver) {
- this.sortParameters = sortParameters;
+ this.sortParameters = sortParameters.getCopy();
this.iteratorCount = new AtomicInteger(numberOfThreads);
this.mergerQueue = new LinkedBlockingQueue<>();
this.threadStatusObserver = threadStatusObserver;
@@ -197,6 +204,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
private void createSortDataRows() {
int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ setTempLocation(sortParameters);
this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
sortParameters.getTempFileLocation());
unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
@@ -208,6 +216,16 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
}
+ batchCount++;
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(),
+ parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
+ parameters.getSegmentId(), false);
+ parameters.setTempFileLocation(
+ carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
}
@Override public UnsafeSingleThreadFinalSortFilesMerger next() {
@@ -235,7 +253,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
&& threadStatusObserver.getThrowable() != null && threadStatusObserver
.getThrowable() instanceof CarbonDataLoadingException) {
finalMerger.setStopProcess(true);
- mergerQueue.offer(finalMerger);
+ mergerQueue.put(finalMerger);
}
processRowToNextStep(sortDataRow, sortParameters);
unsafeIntermediateFileMerger.finish();
@@ -243,7 +261,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
unsafeIntermediateFileMerger.getMergedPages());
unsafeIntermediateFileMerger.close();
- mergerQueue.offer(finalMerger);
+ mergerQueue.put(finalMerger);
sortDataRow = null;
unsafeIntermediateFileMerger = null;
finalMerger = null;
@@ -251,16 +269,20 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
throw new CarbonDataLoadingException(e);
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
+ } catch (InterruptedException e) {
+ throw new CarbonDataLoadingException(e);
}
}
- public synchronized void finishThread() {
- if (iteratorCount.decrementAndGet() <= 0) {
- finish();
+ public void finishThread() {
+ synchronized (lock) {
+ if (iteratorCount.decrementAndGet() <= 0) {
+ finish();
+ }
}
}
- public synchronized boolean hasNext() {
+ public boolean hasNext() {
return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
}