You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/22 09:45:31 UTC
[3/4] carbondata git commit: Fixed issue of more records after update.
Fixed issue of more records after update.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3319851b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3319851b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3319851b
Branch: refs/heads/master
Commit: 3319851bfe6d14369112d6bd0d4d3c1f670aa777
Parents: 843c26c
Author: ravipesala <ra...@gmail.com>
Authored: Wed Jun 21 14:14:44 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Jun 22 15:13:02 2017 +0530
----------------------------------------------------------------------
.../DictionaryBasedVectorResultCollector.java | 6 +++--
.../core/scan/result/AbstractScannedResult.java | 5 +++-
.../scan/scanner/AbstractBlockletScanner.java | 19 ++++++++++++++-
.../iud/UpdateCarbonTableTestCase.scala | 25 +++++++++++++++++++-
4 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 3203934..73ccb5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -130,6 +130,7 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
@Override public void collectVectorBatch(AbstractScannedResult scannedResult,
CarbonColumnarBatch columnarBatch) {
int numberOfPages = scannedResult.numberOfpages();
+ int filteredRows = 0;
while (scannedResult.getCurrentPageCounter() < numberOfPages) {
int currentPageRowCount = scannedResult.getCurrentPageRowCount();
if (currentPageRowCount == 0) {
@@ -138,13 +139,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
}
int rowCounter = scannedResult.getRowCounter();
int availableRows = currentPageRowCount - rowCounter;
- int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize();
+ int requiredRows =
+ columnarBatch.getBatchSize() - (columnarBatch.getActualSize() + filteredRows);
requiredRows = Math.min(requiredRows, availableRows);
if (requiredRows < 1) {
return;
}
fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
- int filteredRows = scannedResult
+ filteredRows = scannedResult
.markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 0a2d670..e78383d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -324,6 +324,9 @@ public abstract class AbstractScannedResult {
rowCounter = 0;
currentRow = -1;
pageCounter++;
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
+ }
}
public int numberOfpages() {
@@ -479,7 +482,7 @@ public abstract class AbstractScannedResult {
rowCounter = 0;
currentRow = -1;
if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + "");
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
}
return hasNext();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index f3d1336..022e351 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.scanner;
import java.io.IOException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -31,6 +32,7 @@ import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.CarbonProperties;
/**
* Blocklet scanner class to process the block
@@ -46,6 +48,10 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
private AbstractScannedResult emptyResult;
+ private static int NUMBER_OF_ROWS_PER_PAGE = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+
public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
this.blockExecutionInfo = tableBlockExecutionInfos;
}
@@ -95,7 +101,7 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
}
}
scannedResult.setMeasureChunks(measureColumnDataChunks);
- int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() };
+ int[] numberOfRows = null;
if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
if (dimensionRawColumnChunks[i] != null) {
@@ -111,6 +117,17 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
}
}
}
+ // count(*) case there would not be any dimensions are measures selected.
+ if (numberOfRows == null) {
+ numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()];
+ for (int i = 0; i < numberOfRows.length; i++) {
+ numberOfRows[i] = NUMBER_OF_ROWS_PER_PAGE;
+ }
+ int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize() % NUMBER_OF_ROWS_PER_PAGE;
+ if (lastPageSize > 0) {
+ numberOfRows[numberOfRows.length - 1] = lastPageSize;
+ }
+ }
scannedResult.setNumberOfRows(numberOfRows);
scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
// adding statistics for carbon scan time
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 7917b61..79fda30 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.spark.testsuite.iud
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -386,6 +386,29 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
}
}
+ test("More records after update operation ") {
+ sql("DROP TABLE IF EXISTS default.carbon1")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 36000)
+ .map(x => (x+"a", "b", x))
+ .toDF("c1", "c2", "c3")
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .option("tempCSV", "true")
+ .option("compress", "true")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+ sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show()
+
+ checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+ sql("DROP TABLE IF EXISTS default.carbon1")
+ }
+
override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")