You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/02/17 14:09:09 UTC
[1/2] incubator-carbondata git commit: fix memory issue for
dataloading
Repository: incubator-carbondata
Updated Branches:
refs/heads/master b67760184 -> a6c8d2a79
fix memory issue for dataloading
fix comment
fix comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c5aba5f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c5aba5f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c5aba5f5
Branch: refs/heads/master
Commit: c5aba5f518a8ea5d5907273c1782df4f9015746a
Parents: b677601
Author: QiangCai <qi...@qq.com>
Authored: Thu Feb 9 00:22:03 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Feb 17 19:37:32 2017 +0530
----------------------------------------------------------------------
conf/carbon.properties.template | 4 ++-
.../core/constants/CarbonCommonConstants.java | 9 ++++--
.../carbondata/core/util/CarbonProperties.java | 21 +++++++++++++
.../processing/csvload/CSVInputFormat.java | 12 ++++++++
.../newflow/AbstractDataLoadProcessorStep.java | 7 ++---
.../processing/newflow/row/CarbonRowBatch.java | 31 ++++++++++++++------
.../sort/impl/ParallelReadMergeSorterImpl.java | 12 ++++----
...arallelReadMergeSorterWithBucketingImpl.java | 7 ++---
.../impl/UnsafeParallelReadMergeSorterImpl.java | 7 ++---
.../holder/UnsafeSortTempFileChunkHolder.java | 4 ++-
.../steps/DataConverterProcessorStepImpl.java | 7 ++---
...ConverterProcessorWithBucketingStepImpl.java | 7 ++---
.../steps/DataWriterProcessorStepImpl.java | 5 ++--
.../newflow/steps/DummyClassForTest.java | 5 ++--
.../newflow/steps/InputProcessorStepImpl.java | 3 +-
.../sortandgroupby/sortdata/SortDataRows.java | 5 ++--
.../sortandgroupby/sortdata/SortParameters.java | 10 +++++--
.../sortdata/SortTempFileChunkHolder.java | 6 +++-
18 files changed, 111 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index d7356a6..9b85c75 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -113,4 +113,6 @@ carbon.enable.quick.filter=false
##The property to set the date to be considered as start date for calculating the timestamp.
#carbon.cutOffTimestamp=2000-01-01 00:00:00
##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY level.
-#carbon.timegranularity=SECOND
\ No newline at end of file
+#carbon.timegranularity=SECOND
+##the number of prefetched rows in sort step
+#carbon.prefetch.buffersize=1000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 437144c..f1faa57 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -267,7 +267,7 @@ public final class CarbonCommonConstants {
/**
* SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE
*/
- public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE = "50000";
+ public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE = "16384";
/**
* Number of cores to be used while loading
*/
@@ -638,10 +638,15 @@ public final class CarbonCommonConstants {
* for dimensions , one of ignore dictionary dimensions , one for measures.
*/
public static final int ARRAYSIZE = 3;
+
/**
* CARBON_PREFETCH_BUFFERSIZE
*/
- public static final int CARBON_PREFETCH_BUFFERSIZE = 20000;
+ public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";
+ /**
+ * CARBON_PREFETCH_BUFFERSIZE DEFAULT VALUE
+ */
+ public static final String CARBON_PREFETCH_BUFFERSIZE_DEFAULT = "1000";
/**
* CARBON_PREFETCH_IN_MERGE
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 8f5ad25..962d352 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -84,6 +84,27 @@ public final class CarbonProperties {
validateHighCardinalityInRowCountPercentage();
validateCarbonDataFileVersion();
validateExecutorStartUpTime();
+ validatePrefetchBufferSize();
+ }
+
+ private void validatePrefetchBufferSize() {
+ String prefetchBufferSizeStr =
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+
+ if (null == prefetchBufferSizeStr || prefetchBufferSizeStr.length() == 0) {
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT);
+ } else {
+ try {
+ Integer.parseInt(prefetchBufferSizeStr);
+ } catch (NumberFormatException e) {
+ LOGGER.info("The prefetch buffer size value \"" + prefetchBufferSizeStr
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT + "\"");
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT);
+ }
+ }
}
private void validateBadRecordsLocation() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index f38175d..269a127 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -231,6 +231,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (csvParser == null) {
+ return false;
+ }
columns = csvParser.parseNext();
if (columns == null) {
value = null;
@@ -275,9 +278,18 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
if (reader != null) {
reader.close();
}
+ if (boundedInputStream != null) {
+ boundedInputStream.close();
+ }
} finally {
+ reader = null;
+ boundedInputStream = null;
+ csvParser = null;
+ filePosition = null;
+ value = null;
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
index 18d6aeb..21030cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
@@ -128,10 +128,9 @@ public abstract class AbstractDataLoadProcessorStep {
* @return processed row.
*/
protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
- CarbonRowBatch newBatch = new CarbonRowBatch();
- Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
- while (batchIterator.hasNext()) {
- newBatch.addRow(processRow(batchIterator.next()));
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ newBatch.addRow(processRow(rowBatch.next()));
}
return newBatch;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
index 941b51d..3c6a7d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -17,27 +17,40 @@
package org.apache.carbondata.processing.newflow.row;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
/**
* Batch of rows.
*/
-public class CarbonRowBatch {
+public class CarbonRowBatch implements Iterator<CarbonRow> {
- private List<CarbonRow> rowBatch = new ArrayList<>();
+ private CarbonRow[] rowBatch;
- public void addRow(CarbonRow carbonRow) {
- rowBatch.add(carbonRow);
+ private int size = 0;
+
+ private int index = 0;
+
+ public CarbonRowBatch(int batchSize) {
+ this.rowBatch = new CarbonRow[batchSize];
}
- public Iterator<CarbonRow> getBatchIterator() {
- return rowBatch.iterator();
+ public void addRow(CarbonRow carbonRow) {
+ rowBatch[size++] = carbonRow;
}
public int getSize() {
- return rowBatch.size();
+ return size;
+ }
+
+ @Override public boolean hasNext() {
+ return index < size;
+ }
+
+ @Override public CarbonRow next() {
+ return rowBatch[index++];
}
+ @Override public void remove() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 16c5122..8c04f8f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -111,6 +111,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
}
try {
intermediateFileMerger.finish();
+ intermediateFileMerger = null;
finalMerger.startFinalMerge();
} catch (CarbonDataWriterException e) {
throw new CarbonDataLoadingException(e);
@@ -129,7 +130,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
@Override
public CarbonRowBatch next() {
int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch();
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
while (finalMerger.hasNext() && counter < batchSize) {
rowBatch.addRow(new CarbonRow(finalMerger.next()));
counter++;
@@ -141,7 +142,9 @@ public class ParallelReadMergeSorterImpl implements Sorter {
}
@Override public void close() {
- intermediateFileMerger.close();
+ if (intermediateFileMerger != null) {
+ intermediateFileMerger.close();
+ }
}
/**
@@ -200,10 +203,9 @@ public class ParallelReadMergeSorterImpl implements Sorter {
try {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
- Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
int i = 0;
- while (batchIterator.hasNext()) {
- CarbonRow row = batchIterator.next();
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
if (row != null) {
buffer[i++] = row.getData();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 245302f..813d83d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -209,10 +209,9 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
try {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
- Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
int i = 0;
- while (batchIterator.hasNext()) {
- CarbonRow row = batchIterator.next();
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
if (row != null) {
SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
synchronized (sortDataRow) {
@@ -257,7 +256,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
@Override public CarbonRowBatch next() {
int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch();
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
while (finalMerger.hasNext() && counter < batchSize) {
rowBatch.addRow(new CarbonRow(finalMerger.next()));
counter++;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index d40b763..18cf314 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -124,7 +124,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
@Override public CarbonRowBatch next() {
int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch();
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
while (finalMerger.hasNext() && counter < batchSize) {
rowBatch.addRow(new CarbonRow(finalMerger.next()));
counter++;
@@ -194,10 +194,9 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
try {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
- Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
int i = 0;
- while (batchIterator.hasNext()) {
- CarbonRow row = batchIterator.next();
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
if (row != null) {
buffer[i++] = row.getData();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index a9a63d3..0c89821 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -167,7 +167,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
- bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+ bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 1a6535f..275f017 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -89,10 +89,9 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
* @return processed row.
*/
protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
- CarbonRowBatch newBatch = new CarbonRowBatch();
- Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
- while (batchIterator.hasNext()) {
- newBatch.addRow(localConverter.convert(batchIterator.next()));
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ newBatch.addRow(localConverter.convert(rowBatch.next()));
}
rowCounter.getAndAdd(newBatch.getSize());
return newBatch;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 0223b04..fef4aaa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -113,10 +113,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
* @return processed row.
*/
protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
- CarbonRowBatch newBatch = new CarbonRowBatch();
- Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
- while (batchIterator.hasNext()) {
- CarbonRow next = batchIterator.next();
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ CarbonRow next = rowBatch.next();
CarbonRow convertRow = localConverter.convert(next);
convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
newBatch.addRow(convertRow);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 710cc4f..dfc03b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -179,10 +179,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
throws CarbonDataLoadingException {
- Iterator<CarbonRow> iterator = batch.getBatchIterator();
try {
- while (iterator.hasNext()) {
- CarbonRow row = iterator.next();
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
readCounter++;
Object[] outputRow;
// adding one for the high cardinality dims byte array.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
index a7d8e7f..8b8000b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
@@ -89,9 +89,8 @@ class DummyThread implements Callable<Void> {
try {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
- Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
- while (batchIterator.hasNext()) {
- CarbonRow row = batchIterator.next();
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
// do nothing
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 0097690..7c50a10 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -175,6 +175,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
boolean hasNext = currentIterator.hasNext();
// If iterator is finished then check for next iterator.
if (!hasNext) {
+ currentIterator.close();
// Check next iterator is available in the list.
if (counter < inputIterators.size()) {
// Get the next iterator from the list.
@@ -228,7 +229,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
private CarbonRowBatch getBatch() {
// Create batch and fill it.
- CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+ CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
int count = 0;
while (internalHasNext() && count < batchSize) {
carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 1384ec9..a29b426 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -149,11 +149,12 @@ public class SortDataRows {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
synchronized (addRowsLock) {
+ int sizeLeft = 0;
if (entryCount + size >= sortBufferSize) {
LOGGER.debug("************ Writing to temp file ********** ");
intermediateFileMerger.startMergingIfPossible();
Object[][] recordHolderListLocal = recordHolderList;
- int sizeLeft = sortBufferSize - entryCount ;
+ sizeLeft = sortBufferSize - entryCount ;
if (sizeLeft > 0) {
System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
}
@@ -172,7 +173,7 @@ public class SortDataRows {
return;
}
}
- System.arraycopy(rowBatch, 0, recordHolderList, entryCount, size);
+ System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
entryCount += size;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 39a9bc3..dc40efe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -425,7 +425,9 @@ public class SortParameters {
}
parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
- parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+ parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
char[] aggType = CarbonDataProcessorUtil
.getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
@@ -525,8 +527,10 @@ public class SortParameters {
LOGGER.info("Compression will be used for writing the sort temp File");
}
- parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
- parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+ parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
+ parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
char[] aggType = CarbonDataProcessorUtil
.getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c5aba5f5/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index f088e75..416a445 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -178,7 +178,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
- bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+ bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
@@ -464,6 +466,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
public void closeStream() {
CarbonUtil.closeStreams(stream);
executorService.shutdown();
+ this.backupBuffer = null;
+ this.currentBuffer = null;
}
/**
[2/2] incubator-carbondata git commit: [CARBONDATA-701]Fix memory
leak issue for new flow dataloading This closes #594
Posted by ra...@apache.org.
[CARBONDATA-701]Fix memory leak issue for new flow dataloading This closes #594
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a6c8d2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a6c8d2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a6c8d2a7
Branch: refs/heads/master
Commit: a6c8d2a79716937adf74962c2c3db19923f1b0e7
Parents: b677601 c5aba5f
Author: ravipesala <ra...@gmail.com>
Authored: Fri Feb 17 19:38:44 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Feb 17 19:38:44 2017 +0530
----------------------------------------------------------------------
conf/carbon.properties.template | 4 ++-
.../core/constants/CarbonCommonConstants.java | 9 ++++--
.../carbondata/core/util/CarbonProperties.java | 21 +++++++++++++
.../processing/csvload/CSVInputFormat.java | 12 ++++++++
.../newflow/AbstractDataLoadProcessorStep.java | 7 ++---
.../processing/newflow/row/CarbonRowBatch.java | 31 ++++++++++++++------
.../sort/impl/ParallelReadMergeSorterImpl.java | 12 ++++----
...arallelReadMergeSorterWithBucketingImpl.java | 7 ++---
.../impl/UnsafeParallelReadMergeSorterImpl.java | 7 ++---
.../holder/UnsafeSortTempFileChunkHolder.java | 4 ++-
.../steps/DataConverterProcessorStepImpl.java | 7 ++---
...ConverterProcessorWithBucketingStepImpl.java | 7 ++---
.../steps/DataWriterProcessorStepImpl.java | 5 ++--
.../newflow/steps/DummyClassForTest.java | 5 ++--
.../newflow/steps/InputProcessorStepImpl.java | 3 +-
.../sortandgroupby/sortdata/SortDataRows.java | 5 ++--
.../sortandgroupby/sortdata/SortParameters.java | 10 +++++--
.../sortdata/SortTempFileChunkHolder.java | 6 +++-
18 files changed, 111 insertions(+), 51 deletions(-)
----------------------------------------------------------------------