You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/22 09:45:31 UTC

[3/4] carbondata git commit: Fixed issue of more records after update.

Fixed issue of more records after update.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3319851b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3319851b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3319851b

Branch: refs/heads/master
Commit: 3319851bfe6d14369112d6bd0d4d3c1f670aa777
Parents: 843c26c
Author: ravipesala <ra...@gmail.com>
Authored: Wed Jun 21 14:14:44 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Jun 22 15:13:02 2017 +0530

----------------------------------------------------------------------
 .../DictionaryBasedVectorResultCollector.java   |  6 +++--
 .../core/scan/result/AbstractScannedResult.java |  5 +++-
 .../scan/scanner/AbstractBlockletScanner.java   | 19 ++++++++++++++-
 .../iud/UpdateCarbonTableTestCase.scala         | 25 +++++++++++++++++++-
 4 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 3203934..73ccb5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -130,6 +130,7 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
   @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
       CarbonColumnarBatch columnarBatch) {
     int numberOfPages = scannedResult.numberOfpages();
+    int filteredRows = 0;
     while (scannedResult.getCurrentPageCounter() < numberOfPages) {
       int currentPageRowCount = scannedResult.getCurrentPageRowCount();
       if (currentPageRowCount == 0) {
@@ -138,13 +139,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
       }
       int rowCounter = scannedResult.getRowCounter();
       int availableRows = currentPageRowCount - rowCounter;
-      int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize();
+      int requiredRows =
+          columnarBatch.getBatchSize() - (columnarBatch.getActualSize() + filteredRows);
       requiredRows = Math.min(requiredRows, availableRows);
       if (requiredRows < 1) {
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
-      int filteredRows = scannedResult
+      filteredRows = scannedResult
           .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
       columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 0a2d670..e78383d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -324,6 +324,9 @@ public abstract class AbstractScannedResult {
     rowCounter = 0;
     currentRow = -1;
     pageCounter++;
+    if (null != deletedRecordMap) {
+      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
+    }
   }
 
   public int numberOfpages() {
@@ -479,7 +482,7 @@ public abstract class AbstractScannedResult {
       rowCounter = 0;
       currentRow = -1;
       if (null != deletedRecordMap) {
-        currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + "");
+        currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
       }
       return hasNext();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index f3d1336..022e351 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.scanner;
 import java.io.IOException;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -31,6 +32,7 @@ import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * Blocklet scanner class to process the block
@@ -46,6 +48,10 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
 
   private AbstractScannedResult emptyResult;
 
+  private static int NUMBER_OF_ROWS_PER_PAGE = Integer.parseInt(CarbonProperties.getInstance()
+      .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+          CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+
   public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
     this.blockExecutionInfo = tableBlockExecutionInfos;
   }
@@ -95,7 +101,7 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
       }
     }
     scannedResult.setMeasureChunks(measureColumnDataChunks);
-    int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() };
+    int[] numberOfRows = null;
     if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
       for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
         if (dimensionRawColumnChunks[i] != null) {
@@ -111,6 +117,17 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
         }
       }
     }
+    // count(*)  case there would not be any dimensions are measures selected.
+    if (numberOfRows == null) {
+      numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()];
+      for (int i = 0; i < numberOfRows.length; i++) {
+        numberOfRows[i] = NUMBER_OF_ROWS_PER_PAGE;
+      }
+      int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize() % NUMBER_OF_ROWS_PER_PAGE;
+      if (lastPageSize > 0) {
+        numberOfRows[numberOfRows.length - 1] = lastPageSize;
+      }
+    }
     scannedResult.setNumberOfRows(numberOfRows);
     scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
     // adding statistics for carbon scan time

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3319851b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 7917b61..79fda30 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.iud
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -386,6 +386,29 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("More records after update operation ") {
+    sql("DROP TABLE IF EXISTS default.carbon1")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 36000)
+      .map(x => (x+"a", "b", x))
+      .toDF("c1", "c2", "c3")
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+    sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show()
+
+    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+    sql("DROP TABLE IF EXISTS default.carbon1")
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")