You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:30 UTC

[28/45] carbondata git commit: [CARBONDATA-2987] Data mismatch after compaction with measure sort columns

[CARBONDATA-2987] Data mismatch after compaction with measure sort columns

problem: Data mismatch after compaction with measure sort columns

root cause : In compaction flow (DictionaryBasedResultCollector), in ColumnPageWrapper inverted index mapping is not handled. Because of this row ID was wrong, row of no dictionary dimension columns gets data from other rows.
Hence the data mismatch.

solution: Handle inverted index mapping for DictionaryBasedResultCollector flow in ColumnPageWrapper

This closes #2784


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0b16816d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0b16816d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0b16816d

Branch: refs/heads/branch-1.5
Commit: 0b16816da7b401318929bfe973dad4bf397e90d9
Parents: 6ef4e46
Author: ajantha-bhat <aj...@gmail.com>
Authored: Fri Sep 28 16:27:55 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Oct 3 20:09:13 2018 +0530

----------------------------------------------------------------------
 .../chunk/store/ColumnPageWrapper.java          | 52 +++++++----
 .../compaction/nodictionary_compaction.csv      |  3 +
 .../MajorCompactionWithMeasureSortColumns.scala | 97 ++++++++++++++++++++
 3 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b16816d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 098287e..627c75f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -122,10 +122,19 @@ public class ColumnPageWrapper implements DimensionColumnPage {
   }
 
   @Override public byte[] getChunkData(int rowId) {
-    return getChunkData(rowId, false);
+    byte[] nullBitSet = getNullBitSet(rowId, columnPage.getColumnSpec().getColumnType());
+    if (nullBitSet != null) {
+      // if this row is null, return default null represent in byte array
+      return nullBitSet;
+    } else {
+      if (isExplicitSorted()) {
+        rowId = getInvertedReverseIndex(rowId);
+      }
+      return getChunkDataInBytes(rowId);
+    }
   }
 
-  private byte[] getChunkData(int rowId, boolean isRowIdChanged) {
+  private byte[] getChunkDataInBytes(int rowId) {
     ColumnType columnType = columnPage.getColumnSpec().getColumnType();
     DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
     DataType targetDataType = columnPage.getDataType();
@@ -134,15 +143,6 @@ public class ColumnPageWrapper implements DimensionColumnPage {
           .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
     } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && isAdaptiveEncoded()) || (
         columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) {
-      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
-          && columnType == ColumnType.COMPLEX_PRIMITIVE) {
-        // if this row is null, return default null represent in byte array
-        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-      }
-      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
-        // if this row is null, return default null represent in byte array
-        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
-      }
       if (srcDataType == DataTypes.FLOAT) {
         float floatData = columnPage.getFloat(rowId);
         return ByteUtil.toXorBytes(floatData);
@@ -182,9 +182,6 @@ public class ColumnPageWrapper implements DimensionColumnPage {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
     } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !isAdaptiveEncoded())) {
-      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
-        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
-      }
       if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
         byte[] out = new byte[1];
         out[0] = (columnPage.getByte(rowId));
@@ -205,6 +202,18 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     }
   }
 
+  private byte[] getNullBitSet(int rowId, ColumnType columnType) {
+    if (columnPage.getNullBits().get(rowId) && columnType == ColumnType.COMPLEX_PRIMITIVE) {
+      // if this row is null, return default null represent in byte array
+      return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+    }
+    if (columnPage.getNullBits().get(rowId)) {
+      // if this row is null, return default null represent in byte array
+      return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+    }
+    return null;
+  }
+
   private Object getActualData(int rowId, boolean isRowIdChanged) {
     ColumnType columnType = columnPage.getColumnSpec().getColumnType();
     DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
@@ -302,8 +311,19 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   @Override
   public int compareTo(int rowId, byte[] compareValue) {
-    byte[] chunkData = this.getChunkData((int) rowId);
-    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
+    // rowId is the inverted index, but the null bitset is based on actual data
+    int nullBitSetRowId = rowId;
+    if (isExplicitSorted()) {
+      nullBitSetRowId = getInvertedReverseIndex(rowId);
+    }
+    byte[] nullBitSet = getNullBitSet(nullBitSetRowId, columnPage.getColumnSpec().getColumnType());
+    if (nullBitSet != null) {
+      // if this row is null, return default null represent in byte array
+      return ByteUtil.UnsafeComparer.INSTANCE.compareTo(nullBitSet, compareValue);
+    } else {
+      byte[] chunkData = this.getChunkDataInBytes(rowId);
+      return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b16816d/integration/spark-common-test/src/test/resources/compaction/nodictionary_compaction.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/compaction/nodictionary_compaction.csv b/integration/spark-common-test/src/test/resources/compaction/nodictionary_compaction.csv
new file mode 100644
index 0000000..2518fd2
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/compaction/nodictionary_compaction.csv
@@ -0,0 +1,3 @@
+code1,code2,country_code,category_id,product_id,date,count1,count2,count3
+"51job, Inc.",21695-534,FR,610,60,2017-11-27,4483,0,510
+Intercontinental Exchange Inc.,22100-020,TH,87,4,2017-10-16,2,647,69630

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b16816d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionWithMeasureSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionWithMeasureSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionWithMeasureSortColumns.scala
new file mode 100644
index 0000000..ff56619
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionWithMeasureSortColumns.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+
+class MajorCompactionWithMeasureSortColumns extends QueryTest with BeforeAndAfterAll {
+
+  val csvFilePath = s"$resourcesPath/compaction/nodictionary_compaction.csv"
+  val backupDateFormat = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists store")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
+
+  override def afterAll {
+    sql("drop table if exists  store")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, backupDateFormat)
+  }
+
+  test("test major compaction with measure sort columns") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "1024")
+
+    val createStoreTableSql =
+      s"""
+         | CREATE TABLE IF NOT EXISTS store(
+         | code1 STRING,
+         | code2 STRING,
+         | country_code STRING,
+         | category_id INTEGER,
+         | product_id LONG,
+         | date DATE,
+         | count1 LONG,
+         | count2 LONG,
+         | count3 LONG
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'SORT_COLUMNS'='code1, code2, country_code, date, category_id, product_id',
+         | 'SORT_SCOPE'='LOCAL_SORT',
+         | 'CACHE_LEVEL'='BLOCKLET'
+         | )
+      """.stripMargin
+    sql(createStoreTableSql)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvFilePath'
+         | INTO TABLE store
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+       """.stripMargin).show(false)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvFilePath'
+         | INTO TABLE store
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+       """.stripMargin).show(false)
+
+    val csvRows = sqlContext.sparkSession.read.option("header", "true")
+      .csv(csvFilePath).orderBy("code1")
+
+    sql("ALTER TABLE store COMPACT 'MAJOR'")
+
+    val answer = sql("select * from store ").orderBy("code1")
+    assert(answer.except(csvRows).count() == 0)
+    sql("drop table store")
+  }
+
+}