You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/20 13:06:49 UTC
[1/2] incubator-carbondata git commit: Fixed: 1. Data mismatch issue
2. Memory Leak issue
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 278ed4d54 -> bf76777d8
Fixed:
1. Data mismatch issue
2. Memory Leak issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e2408551
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e2408551
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e2408551
Branch: refs/heads/master
Commit: e2408551aca873b632b3eaaf6ad4d2ac52ebc8ee
Parents: 278ed4d
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Apr 20 17:55:12 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 20 18:35:27 2017 +0530
----------------------------------------------------------------------
.../core/memory/UnsafeMemoryAllocator.java | 2 +
.../processor/AbstractDataBlockIterator.java | 7 +--
.../apache/carbondata/core/util/NodeHolder.java | 13 ++++
.../store/CarbonFactDataHandlerColumnar.java | 11 ++--
.../writer/v3/CarbonFactDataWriterImplV3.java | 62 +++++++++++++++-----
5 files changed, 71 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
index b81a7ba..db38ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
@@ -26,6 +26,8 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = CarbonUnsafe.unsafe.allocateMemory(size);
+ // initializing memory with zero
+ CarbonUnsafe.unsafe.setMemory(null, address, size, (byte) 0);
return new MemoryBlock(null, address, size);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index aec4933..fdb5483 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -108,15 +108,14 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
if (scannedResult != null && scannedResult.hasNext()) {
return true;
} else {
+ if (null != scannedResult) {
+ scannedResult.freeMemory();
+ }
return dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get();
}
}
protected boolean updateScanner() {
- // clear the current result if all the records are processed
- if (scannedResult != null && !scannedResult.hasNext()) {
- scannedResult.freeMemory();
- }
try {
if (scannedResult != null && scannedResult.hasNext()) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index f80a08d..69ed9f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -156,6 +156,12 @@ public class NodeHolder {
private int holderSize;
/**
+ * to check all the pages to be
+ * written, this will be used for v3 format
+ */
+ private boolean writeAll;
+
+ /**
* @return the keyArray
*/
public byte[][] getKeyArray() {
@@ -440,4 +446,11 @@ public class NodeHolder {
public void setHolderSize(int holderSize) {
this.holderSize = holderSize;
}
+
+ public void setWriteAll(boolean writeAll) {
+ this.writeAll = writeAll;
+ }
+ public boolean isWriteAll() {
+ return this.writeAll;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 0e6a49d..7a80f72 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -463,8 +463,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
if (this.entryCount == this.blockletSize) {
try {
semaphore.acquire();
- producerExecutorServiceTaskList.add(producerExecutorService
- .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter)));
+ producerExecutorServiceTaskList.add(producerExecutorService.submit(
+ new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)));
blockletProcessingCount.incrementAndGet();
// set the entry count to zero
processedDataCount += entryCount;
@@ -861,7 +861,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// than 0
if (this.entryCount > 0) {
producerExecutorServiceTaskList.add(producerExecutorService
- .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter)));
+ .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
blockletProcessingCount.incrementAndGet();
processedDataCount += entryCount;
}
@@ -1330,12 +1330,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private BlockletDataHolder blockletDataHolder;
private List<Object[]> dataRows;
private int sequenceNumber;
+ private boolean isWriteAll;
private Producer(BlockletDataHolder blockletDataHolder, List<Object[]> dataRows,
- int sequenceNumber) {
+ int sequenceNumber, boolean isWriteAll) {
this.blockletDataHolder = blockletDataHolder;
this.dataRows = dataRows;
this.sequenceNumber = sequenceNumber;
+ this.isWriteAll = isWriteAll;
}
/**
@@ -1347,6 +1349,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
@Override public Void call() throws Exception {
try {
NodeHolder nodeHolder = processDataRows(dataRows);
+ nodeHolder.setWriteAll(isWriteAll);
// insert the object in array according to sequence number
int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e2408551/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index aa54a4b..6f05b69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -314,11 +314,39 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
@Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
// check the number of pages present in data holder, if pages is exceeding threshold
// it will write the pages to file
- if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) {
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
- writeDataToFile(fileChannel);
+ // condition for writting all the pages
+ if (!holder.isWriteAll()) {
+ boolean isAdded = false;
+ // check if size more than blocklet size then write the page
+ if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) {
+ // if one page size is more than blocklet size
+ if (dataWriterHolder.getNodeHolder().size() == 0) {
+ isAdded = true;
+ dataWriterHolder.addNodeHolder(holder);
+ }
+ LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+ // write the data
+ writeDataToFile(fileChannel);
+ }
+ if (!isAdded) {
+ dataWriterHolder.addNodeHolder(holder);
+ }
+ } else {
+ //for last blocklet check if the last page will exceed the blocklet size then write
+ // existing pages and then last page
+ if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize
+ && dataWriterHolder.getNodeHolder().size() > 0) {
+ LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+ writeDataToFile(fileChannel);
+ dataWriterHolder.addNodeHolder(holder);
+ LOGGER.info("Number of Pages for blocklet is: " + "1");
+ writeDataToFile(fileChannel);
+ } else {
+ dataWriterHolder.addNodeHolder(holder);
+ LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+ writeDataToFile(fileChannel);
+ }
}
- dataWriterHolder.addNodeHolder(holder);
}
private void writeDataToFile(FileChannel channel) {
@@ -538,19 +566,21 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
* @throws CarbonDataWriterException
*/
public void closeWriter() throws CarbonDataWriterException {
- if (dataWriterHolder.getNodeHolder().size() > 0) {
- writeDataToFile(fileChannel);
- writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- renameCarbonDataFile();
- copyCarbonDataFileToCarbonStorePath(
- this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
- try {
- writeIndexFile();
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while writing the index file", e);
- }
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+ renameCarbonDataFile();
+ copyCarbonDataFileToCarbonStorePath(
+ this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+ try {
+ writeIndexFile();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the index file", e);
}
closeExecutorService();
}
+
+ @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
+ if (this.blockletMetadata.size() > 0) {
+ writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+ }
+ }
}
[2/2] incubator-carbondata git commit: [CARBONDATA-963]Fixed Data
mismatch issue and Memory Leak Issue. This closes #820
Posted by ra...@apache.org.
[CARBONDATA-963]Fixed Data mismatch issue and Memory Leak Issue. This closes #820
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bf76777d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bf76777d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bf76777d
Branch: refs/heads/master
Commit: bf76777d822cf785fb8f9e3338499cf991f2af77
Parents: 278ed4d e240855
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 20 18:36:24 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 20 18:36:24 2017 +0530
----------------------------------------------------------------------
.../core/memory/UnsafeMemoryAllocator.java | 2 +
.../processor/AbstractDataBlockIterator.java | 7 +--
.../apache/carbondata/core/util/NodeHolder.java | 13 ++++
.../store/CarbonFactDataHandlerColumnar.java | 11 ++--
.../writer/v3/CarbonFactDataWriterImplV3.java | 62 +++++++++++++++-----
5 files changed, 71 insertions(+), 24 deletions(-)
----------------------------------------------------------------------