You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/14 04:18:01 UTC

[02/13] carbondata git commit: [CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk

[CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk

The unsafe row page will only be written to disk if the memory is
unavailable -- the previous logic just reversed it.

This closes #2037


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a161841e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a161841e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a161841e

Branch: refs/heads/master
Commit: a161841e8808a3a477715346c8b28e683a5bc4d7
Parents: b509ad8
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Mar 9 09:55:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800

----------------------------------------------------------------------
 .../loading/sort/unsafe/UnsafeSortDataRows.java | 37 ++++++++------------
 1 file changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a161841e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index eaa858e..7afda0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -129,14 +129,7 @@ public class UnsafeSortDataRows {
    * This method will be used to initialize
    */
   public void initialize() throws MemoryException {
-    MemoryBlock baseBlock =
-        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-    boolean isMemoryAvailable =
-        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
-    if (isMemoryAvailable) {
-      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
-    }
-    this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+    this.rowPage = createUnsafeRowPage();
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -148,6 +141,17 @@ public class UnsafeSortDataRows {
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
+  private UnsafeCarbonRowPage createUnsafeRowPage() throws MemoryException {
+    MemoryBlock baseBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+    boolean isMemoryAvailable =
+        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+    if (isMemoryAvailable) {
+      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+    }
+    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+  }
+
   public boolean canAdd() {
     return bytesAdded < maxSizeAllowed;
   }
@@ -192,14 +196,7 @@ public class UnsafeSortDataRows {
           unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
           semaphore.acquire();
           dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
-          MemoryBlock memoryBlock =
-              UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-          boolean saveToDisk =
-              UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-          if (!saveToDisk) {
-            UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-          }
-          rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+          rowPage = createUnsafeRowPage();
           bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
         } catch (Exception e) {
           LOGGER.error(
@@ -227,13 +224,7 @@ public class UnsafeSortDataRows {
         unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
         semaphore.acquire();
         dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-        MemoryBlock memoryBlock =
-            UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-        boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-        if (!saveToDisk) {
-          UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-        }
-        rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+        rowPage = createUnsafeRowPage();
         rowPage.addRow(row, rowBuffer.get());
       } catch (Exception e) {
         LOGGER.error(