You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/20 13:06:49 UTC

[1/2] incubator-carbondata git commit: Fixed: 1. Data mismatch issue 2. Memory Leak issue

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 278ed4d54 -> bf76777d8


Fixed:
1. Data mismatch issue
2. Memory Leak issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e2408551
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e2408551
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e2408551

Branch: refs/heads/master
Commit: e2408551aca873b632b3eaaf6ad4d2ac52ebc8ee
Parents: 278ed4d
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Apr 20 17:55:12 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 20 18:35:27 2017 +0530

----------------------------------------------------------------------
 .../core/memory/UnsafeMemoryAllocator.java      |  2 +
 .../processor/AbstractDataBlockIterator.java    |  7 +--
 .../apache/carbondata/core/util/NodeHolder.java | 13 ++++
 .../store/CarbonFactDataHandlerColumnar.java    | 11 ++--
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 62 +++++++++++++++-----
 5 files changed, 71 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
index b81a7ba..db38ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
@@ -26,6 +26,8 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
   @Override
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
     long address = CarbonUnsafe.unsafe.allocateMemory(size);
+    // initializing memory with zero
+    CarbonUnsafe.unsafe.setMemory(null, address, size, (byte) 0);
     return new MemoryBlock(null, address, size);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index aec4933..fdb5483 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -108,15 +108,14 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
     if (scannedResult != null && scannedResult.hasNext()) {
       return true;
     } else {
+      if (null != scannedResult) {
+        scannedResult.freeMemory();
+      }
       return dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get();
     }
   }
 
   protected boolean updateScanner() {
-    // clear the current result if all the records are processed
-    if (scannedResult != null && !scannedResult.hasNext()) {
-      scannedResult.freeMemory();
-    }
     try {
       if (scannedResult != null && scannedResult.hasNext()) {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index f80a08d..69ed9f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -156,6 +156,12 @@ public class NodeHolder {
   private int holderSize;
 
   /**
+   * to check all the pages to be
+   * written, this will be used for v3 format
+   */
+  private boolean writeAll;
+
+  /**
    * @return the keyArray
    */
   public byte[][] getKeyArray() {
@@ -440,4 +446,11 @@ public class NodeHolder {
   public void setHolderSize(int holderSize) {
     this.holderSize = holderSize;
   }
+
+  public void setWriteAll(boolean writeAll) {
+    this.writeAll = writeAll;
+  }
+  public boolean isWriteAll() {
+    return this.writeAll;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 0e6a49d..7a80f72 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -463,8 +463,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     if (this.entryCount == this.blockletSize) {
       try {
         semaphore.acquire();
-        producerExecutorServiceTaskList.add(producerExecutorService
-            .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter)));
+        producerExecutorServiceTaskList.add(producerExecutorService.submit(
+            new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)));
         blockletProcessingCount.incrementAndGet();
         // set the entry count to zero
         processedDataCount += entryCount;
@@ -861,7 +861,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     // than 0
     if (this.entryCount > 0) {
       producerExecutorServiceTaskList.add(producerExecutorService
-          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter)));
+          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
       blockletProcessingCount.incrementAndGet();
       processedDataCount += entryCount;
     }
@@ -1330,12 +1330,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     private BlockletDataHolder blockletDataHolder;
     private List<Object[]> dataRows;
     private int sequenceNumber;
+    private boolean isWriteAll;
 
     private Producer(BlockletDataHolder blockletDataHolder, List<Object[]> dataRows,
-        int sequenceNumber) {
+        int sequenceNumber, boolean isWriteAll) {
       this.blockletDataHolder = blockletDataHolder;
       this.dataRows = dataRows;
       this.sequenceNumber = sequenceNumber;
+      this.isWriteAll = isWriteAll;
     }
 
     /**
@@ -1347,6 +1349,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     @Override public Void call() throws Exception {
       try {
         NodeHolder nodeHolder = processDataRows(dataRows);
+        nodeHolder.setWriteAll(isWriteAll);
         // insert the object in array according to sequence number
         int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
         blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index aa54a4b..6f05b69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -314,11 +314,39 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
   @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
     // check the number of pages present in data holder, if pages is exceeding threshold
     // it will write the pages to file
-    if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) {
-      LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
-      writeDataToFile(fileChannel);
+    // condition for writting all the pages
+    if (!holder.isWriteAll()) {
+      boolean isAdded = false;
+      // check if size more than blocklet size then write the page
+      if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) {
+        // if one page size is more than blocklet size
+        if (dataWriterHolder.getNodeHolder().size() == 0) {
+          isAdded = true;
+          dataWriterHolder.addNodeHolder(holder);
+        }
+        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+        // write the data
+        writeDataToFile(fileChannel);
+      }
+      if (!isAdded) {
+        dataWriterHolder.addNodeHolder(holder);
+      }
+    } else {
+      //for last blocklet check if the last page will exceed the blocklet size then write
+      // existing pages and then last page
+      if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize
+          && dataWriterHolder.getNodeHolder().size() > 0) {
+        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+        writeDataToFile(fileChannel);
+        dataWriterHolder.addNodeHolder(holder);
+        LOGGER.info("Number of Pages for blocklet is: " + "1");
+        writeDataToFile(fileChannel);
+      } else {
+        dataWriterHolder.addNodeHolder(holder);
+        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+        writeDataToFile(fileChannel);
+      }
     }
-    dataWriterHolder.addNodeHolder(holder);
   }
 
   private void writeDataToFile(FileChannel channel) {
@@ -538,19 +566,21 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    if (dataWriterHolder.getNodeHolder().size() > 0) {
-      writeDataToFile(fileChannel);
-      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
-      CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-      renameCarbonDataFile();
-      copyCarbonDataFileToCarbonStorePath(
-          this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
-      try {
-        writeIndexFile();
-      } catch (IOException e) {
-        throw new CarbonDataWriterException("Problem while writing the index file", e);
-      }
+    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+    renameCarbonDataFile();
+    copyCarbonDataFileToCarbonStorePath(
+        this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+    try {
+      writeIndexFile();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the index file", e);
     }
     closeExecutorService();
   }
+
+  @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
+    if (this.blockletMetadata.size() > 0) {
+      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+    }
+  }
 }


[2/2] incubator-carbondata git commit: [CARBONDATA-963]Fixed Data mismatch issue and Memory Leak Issue. This closes #820

Posted by ra...@apache.org.
[CARBONDATA-963]Fixed Data mismatch issue and Memory Leak Issue. This closes #820


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bf76777d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bf76777d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bf76777d

Branch: refs/heads/master
Commit: bf76777d822cf785fb8f9e3338499cf991f2af77
Parents: 278ed4d e240855
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 20 18:36:24 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 20 18:36:24 2017 +0530

----------------------------------------------------------------------
 .../core/memory/UnsafeMemoryAllocator.java      |  2 +
 .../processor/AbstractDataBlockIterator.java    |  7 +--
 .../apache/carbondata/core/util/NodeHolder.java | 13 ++++
 .../store/CarbonFactDataHandlerColumnar.java    | 11 ++--
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 62 +++++++++++++++-----
 5 files changed, 71 insertions(+), 24 deletions(-)
----------------------------------------------------------------------