You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/12 19:49:52 UTC
[1/2] carbondata git commit: [CARBONDATA - 1159] Batch sort loading
is not proper without synchronization
Repository: carbondata
Updated Branches:
refs/heads/master d1080df44 -> 0ad92b6a0
[CARBONDATA - 1159] Batch sort loading is not proper without synchronization
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/408de862
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/408de862
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/408de862
Branch: refs/heads/master
Commit: 408de862a0a47d56b9038d7354d5173567b43eda
Parents: d1080df
Author: dhatchayani <dh...@gmail.com>
Authored: Mon Jun 12 21:56:47 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 13 01:18:54 2017 +0530
----------------------------------------------------------------------
.../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +-
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 72 ++++++++++++--------
.../util/CarbonDataProcessorUtil.java | 7 +-
3 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 7121278..8c345cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -147,9 +147,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
}
}
if (i > 0) {
- sortDataRows.getSortDataRow().addRowBatch(buffer, i);
- rowCounter.getAndAdd(i);
synchronized (sortDataRows) {
+ sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
+ rowCounter.getAndAdd(i);
if (!sortDataRows.getSortDataRow().canAdd()) {
sortDataRows.finish();
sortDataRows.createSortDataRows();
@@ -197,6 +197,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
private void createSortDataRows() {
int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ if (inMemoryChunkSizeInMB > sortParameters.getBatchSortSizeinMb()) {
+ inMemoryChunkSizeInMB = sortParameters.getBatchSortSizeinMb();
+ }
this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
sortParameters.getTempFileLocation());
unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 415d708..074bb3b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -166,35 +166,53 @@ public class UnsafeSortDataRows {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
synchronized (addRowsLock) {
- for (int i = 0; i < size; i++) {
- if (rowPage.canAdd()) {
- bytesAdded += rowPage.addRow(rowBatch[i]);
- } else {
- try {
- if (enableInMemoryIntermediateMerge) {
- unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
- }
- unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
- semaphore.acquire();
- dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
- boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
- rowPage = new UnsafeCarbonRowPage(
- parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount() + parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(),
- parameters.getMeasureDataType(),
- memoryBlock,
- saveToDisk);
- bytesAdded += rowPage.addRow(rowBatch[i]);
- } catch (Exception e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e);
- }
+ addBatch(rowBatch, size);
+ }
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @param size
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
+ throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ addBatch(rowBatch, size);
+ }
+ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ for (int i = 0; i < size; i++) {
+ if (rowPage.canAdd()) {
+ bytesAdded += rowPage.addRow(rowBatch[i]);
+ } else {
+ try {
+ if (enableInMemoryIntermediateMerge) {
+ unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+ }
+ unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+ MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+ boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(),
+ memoryBlock,
+ saveToDisk);
+ bytesAdded += rowPage.addRow(rowBatch[i]);
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
}
+
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1659be2..7bb915a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -444,9 +444,11 @@ public final class CarbonDataProcessorUtil {
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
.toString());
}
+ LOGGER.warn("sort scope is set to " + sortScope);
} catch (Exception e) {
sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
- LOGGER.warn("sort scope is set to " + sortScope);
+ LOGGER.warn("Exception occured while resolving sort scope. " +
+ "sort scope is set to " + sortScope);
}
return sortScope;
}
@@ -469,8 +471,11 @@ public final class CarbonDataProcessorUtil {
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
.toString());
}
+ LOGGER.warn("batch sort size is set to " + batchSortSizeInMb);
} catch (Exception e) {
batchSortSizeInMb = 0;
+ LOGGER.warn("Exception occured while resolving batch sort size. " +
+ "batch sort size is set to " + batchSortSizeInMb);
}
return batchSortSizeInMb;
}
[2/2] carbondata git commit: [CARBONDATA - 1159] Batch sort loading
is not proper without synchronization. This closes #1022
Posted by ra...@apache.org.
[CARBONDATA - 1159] Batch sort loading is not proper without synchronization. This closes #1022
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0ad92b6a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0ad92b6a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0ad92b6a
Branch: refs/heads/master
Commit: 0ad92b6a056d7bb206a4e82f1453d77be25583fe
Parents: d1080df 408de86
Author: ravipesala <ra...@gmail.com>
Authored: Tue Jun 13 01:19:27 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 13 01:19:27 2017 +0530
----------------------------------------------------------------------
.../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +-
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 72 ++++++++++++--------
.../util/CarbonDataProcessorUtil.java | 7 +-
3 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------