You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/03 13:32:40 UTC
[1/2] incubator-carbondata git commit: Fixed dataloading less number
of rows than actual rows when data size is multiples of page size.
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f2fdf2962 -> 8410081cd
Fixed dataloading less number of rows than actual rows when data size is multiples of page size.
Fix testcase.
Closing resources properly
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1fa2df9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1fa2df9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1fa2df9d
Branch: refs/heads/master
Commit: 1fa2df9df1ffba858e5e198f8fec42575e92606c
Parents: f2fdf29
Author: ravipesala <ra...@gmail.com>
Authored: Mon May 1 08:06:58 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Wed May 3 21:31:19 2017 +0800
----------------------------------------------------------------------
.../testsuite/dataload/TestLoadDataFrame.scala | 16 ++++++++++++----
.../newflow/converter/impl/RowConverterImpl.java | 5 ++++-
.../steps/DataConverterProcessorStepImpl.java | 1 +
.../store/CarbonFactDataHandlerColumnar.java | 13 +++++++------
.../writer/v1/CarbonFactDataWriterImplV1.java | 3 +++
.../writer/v2/CarbonFactDataWriterImplV2.java | 3 +++
.../writer/v3/CarbonFactDataWriterImplV3.java | 18 ++++++++----------
.../store/writer/v3/DataWriterHolder.java | 8 ++++++++
8 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 3b0fd4a..2d86497 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -31,7 +31,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
def buildTestData() = {
import sqlContext.implicits._
- df = sqlContext.sparkContext.parallelize(1 to 1000)
+ df = sqlContext.sparkContext.parallelize(1 to 32000)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
@@ -62,6 +62,8 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
buildTestData
}
+
+
test("test load dataframe with saving compressed csv files") {
// save dataframe to carbon file
df.write
@@ -72,7 +74,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
- sql("select count(*) from carbon1 where c3 > 500"), Row(500)
+ sql("select count(*) from carbon1 where c3 > 500"), Row(31500)
)
}
@@ -86,7 +88,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
- sql("select count(*) from carbon2 where c3 > 500"), Row(500)
+ sql("select count(*) from carbon2 where c3 > 500"), Row(31500)
)
}
@@ -99,7 +101,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
- sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+ sql("select count(*) from carbon3 where c3 > 500"), Row(31500)
)
}
@@ -114,6 +116,12 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
sql("SELECT decimal FROM carbon4"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
}
+ test("test loading data if the data count is multiple of page size"){
+ checkAnswer(
+ sql("SELECT count(*) FROM carbon2"),Seq(Row(32000)))
+ }
+
+
override def afterAll {
dropTable
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 2471314..5a476da 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -178,7 +178,10 @@ public class RowConverterImpl implements RowConverter {
client.shutDown();
}
}
- executorService.shutdownNow();
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index cc99469..5d065b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -85,6 +85,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
if (first) {
first = false;
localConverter = converters.get(0).createCopyForNewThread();
+ converters.add(localConverter);
}
return childIter.hasNext();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7a80f72..f6ceb84 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -589,6 +589,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/** generate the NodeHolder from the input rows */
private NodeHolder processDataRows(List<Object[]> dataRows)
throws CarbonDataWriterException {
+ if (dataRows.size() == 0) {
+ return new NodeHolder();
+ }
// to store index of the measure columns which are null
BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
// statistics for one blocklet/page
@@ -859,12 +862,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public void finish() throws CarbonDataWriterException {
// still some data is present in stores if entryCount is more
// than 0
- if (this.entryCount > 0) {
- producerExecutorServiceTaskList.add(producerExecutorService
- .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
- blockletProcessingCount.incrementAndGet();
- processedDataCount += entryCount;
- }
+ producerExecutorServiceTaskList.add(producerExecutorService
+ .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+ blockletProcessingCount.incrementAndGet();
+ processedDataCount += entryCount;
closeWriterExecutionService(producerExecutorService);
processWriteTaskSubmitList(producerExecutorServiceTaskList);
processingComplete = true;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 64077e2..bc9b453 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -198,6 +198,9 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
}
@Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ if (holder.getEntryCount() == 0) {
+ return;
+ }
int indexBlockSize = 0;
for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index ec79186..e8b43a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -65,6 +65,9 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
* @throws CarbonDataWriterException any problem in writing operation
*/
@Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ if (holder.getEntryCount() == 0) {
+ return;
+ }
// size to calculate the size of the blocklet
int size = 0;
// get the blocklet info object
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 6f05b69..bb80d1e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -324,7 +324,9 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
isAdded = true;
dataWriterHolder.addNodeHolder(holder);
}
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+
+ LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
+ + " :Rows Added: " + dataWriterHolder.getTotalRows());
// write the data
writeDataToFile(fileChannel);
}
@@ -334,16 +336,12 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
} else {
//for last blocklet check if the last page will exceed the blocklet size then write
// existing pages and then last page
- if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize
- && dataWriterHolder.getNodeHolder().size() > 0) {
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
- writeDataToFile(fileChannel);
- dataWriterHolder.addNodeHolder(holder);
- LOGGER.info("Number of Pages for blocklet is: " + "1");
- writeDataToFile(fileChannel);
- } else {
+ if (holder.getEntryCount() > 0) {
dataWriterHolder.addNodeHolder(holder);
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+ }
+ if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
+ LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
+ + " :Rows Added: " + dataWriterHolder.getTotalRows());
writeDataToFile(fileChannel);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
index 4368b2b..a98f388 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -48,6 +48,14 @@ public class DataWriterHolder {
return nodeHolder.size();
}
+ public int getTotalRows() {
+ int rows = 0;
+ for (NodeHolder nh : nodeHolder) {
+ rows += nh.getEntryCount();
+ }
+ return rows;
+ }
+
public List<NodeHolder> getNodeHolder() {
return nodeHolder;
}
[2/2] incubator-carbondata git commit: [CARBONDATA-1005] Fixed
dataloading less number of rows than actual rows when data size is multiples
of page size. This closes #870
Posted by ja...@apache.org.
[CARBONDATA-1005] Fixed dataloading less number of rows than actual rows when data size is multiples of page size. This closes #870
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8410081c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8410081c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8410081c
Branch: refs/heads/master
Commit: 8410081cd3af5e0bb530c6807b4abe969f4e9795
Parents: f2fdf29 1fa2df9
Author: jackylk <ja...@huawei.com>
Authored: Wed May 3 21:31:50 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed May 3 21:31:50 2017 +0800
----------------------------------------------------------------------
.../testsuite/dataload/TestLoadDataFrame.scala | 16 ++++++++++++----
.../newflow/converter/impl/RowConverterImpl.java | 5 ++++-
.../steps/DataConverterProcessorStepImpl.java | 1 +
.../store/CarbonFactDataHandlerColumnar.java | 13 +++++++------
.../writer/v1/CarbonFactDataWriterImplV1.java | 3 +++
.../writer/v2/CarbonFactDataWriterImplV2.java | 3 +++
.../writer/v3/CarbonFactDataWriterImplV3.java | 18 ++++++++----------
.../store/writer/v3/DataWriterHolder.java | 8 ++++++++
8 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------