You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/14 04:18:01 UTC
[02/13] carbondata git commit: [CARBONDATA-2232][DataLoad] Fix
incorrect logic in spilling unsafe pages to disk
[CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk
The unsafe row page will only be written to disk if the memory is
unavailable -- the previous logic just reversed it.
This closes #2037
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a161841e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a161841e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a161841e
Branch: refs/heads/master
Commit: a161841e8808a3a477715346c8b28e683a5bc4d7
Parents: b509ad8
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Mar 9 09:55:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800
----------------------------------------------------------------------
.../loading/sort/unsafe/UnsafeSortDataRows.java | 37 ++++++++------------
1 file changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a161841e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index eaa858e..7afda0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -129,14 +129,7 @@ public class UnsafeSortDataRows {
* This method will be used to initialize
*/
public void initialize() throws MemoryException {
- MemoryBlock baseBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean isMemoryAvailable =
- UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
- if (isMemoryAvailable) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
- }
- this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+ this.rowPage = createUnsafeRowPage();
// Delete if any older file exists in sort temp folder
deleteSortLocationIfExists();
@@ -148,6 +141,17 @@ public class UnsafeSortDataRows {
semaphore = new Semaphore(parameters.getNumberOfCores());
}
+ private UnsafeCarbonRowPage createUnsafeRowPage() throws MemoryException {
+ MemoryBlock baseBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean isMemoryAvailable =
+ UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+ if (isMemoryAvailable) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+ }
+ return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+ }
+
public boolean canAdd() {
return bytesAdded < maxSizeAllowed;
}
@@ -192,14 +196,7 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean saveToDisk =
- UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
- if (!saveToDisk) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
- }
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+ rowPage = createUnsafeRowPage();
bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
} catch (Exception e) {
LOGGER.error(
@@ -227,13 +224,7 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock =
- UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
- boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
- if (!saveToDisk) {
- UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
- }
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+ rowPage = createUnsafeRowPage();
rowPage.addRow(row, rowBuffer.get());
} catch (Exception e) {
LOGGER.error(