You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/27 09:51:56 UTC

carbondata git commit: [CARBONDATA-3023] Alter add column issue with SORT_COLUMNS

Repository: carbondata
Updated Branches:
  refs/heads/master bbbe47905 -> 8e570360e


[CARBONDATA-3023] Alter add column issue with SORT_COLUMNS

 Problem-1:
In case of ALTER ADD columns, the newly added column will be added to the schema at the last. But if that column is a SORT_COLUMN then while loading we expect all the SORT_COLUMNS to be at first.

Solution:
While getting the schema from the carbonTable, reshuffle/rearrange the schema with all the SORT_COLUMNS at the first.

Problem-2:
After ALTER DROP followed by ADD a new column to a partition table, LOAD is failing. In the load we are considering the dropped columns also.

Solution:
While loading the partition table take only the existing visible columns from the table. After DROP column, it becomes invisible. We should not considered the dropped columns while loading.

Problem-3:
(1) While checking for the null bitsets for the adaptive encoded primitive types, the null bitsets are based on the actual rowId. Now we are checking on the reverseInvertedIndex.
(2) In case of range filters @nu#LL will be removed in case of noDictionary Column for binary search. But now in the adaptive encoded page we are not using special null character for binary search.

Solution:
(1) The acutal rowId for checking nullBitSets should be taken from the invertedIndex.
(2) Dont remove @nu#LL0 values in case of adaptive encoded page.

This closes #2826


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8e570360
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8e570360
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8e570360

Branch: refs/heads/master
Commit: 8e570360eb8d3e52844c760d3d7da84ff579aa90
Parents: bbbe479
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Oct 17 16:22:39 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Oct 27 15:20:27 2018 +0530

----------------------------------------------------------------------
 .../chunk/store/ColumnPageWrapper.java          | 10 +++--
 .../executer/RangeValueFilterExecuterImpl.java  |  2 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 16 ++++++++
 .../command/carbonTableSchemaCommon.scala       |  1 +
 .../management/CarbonLoadDataCommand.scala      |  5 ++-
 .../AlterTableValidationTestCase.scala          | 40 ++++++++++++++++++++
 6 files changed, 67 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index ba853f9..a1c4aec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -252,12 +252,14 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     // rowId is the inverted index, but the null bitset is based on actual data
     int nullBitSetRowId = rowId;
     if (isExplicitSorted()) {
-      nullBitSetRowId = getInvertedReverseIndex(rowId);
+      nullBitSetRowId = getInvertedIndex(rowId);
     }
     byte[] nullBitSet = getNullBitSet(nullBitSetRowId, columnPage.getColumnSpec().getColumnType());
-    if (nullBitSet != null) {
-      // if this row is null, return default null represent in byte array
-      return ByteUtil.UnsafeComparer.INSTANCE.compareTo(nullBitSet, compareValue);
+    if (nullBitSet != null
+        && ByteUtil.UnsafeComparer.INSTANCE.compareTo(nullBitSet, compareValue) == 0) {
+      // check if the compare value is a null value
+      // if the compare value is null and the data is also null we can directly return 0
+      return 0;
     } else {
       byte[] chunkData = this.getChunkDataInBytes(rowId);
       return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 970756a..e84e82d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -549,7 +549,7 @@ public class RangeValueFilterExecuterImpl implements FilterExecuter {
 
     // Binary Search cannot be done on '@NU#LL$!", so need to check and compare for null on
     // matching row.
-    if (dimensionColumnPage.isNoDicitionaryColumn()) {
+    if (dimensionColumnPage.isNoDicitionaryColumn() && !dimensionColumnPage.isAdaptiveEncoded()) {
       updateForNoDictionaryColumn(startMin, endMax, dimensionColumnPage, bitSet);
     }
     return bitSet;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 1e8d148..71447e9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -671,4 +671,20 @@ object CarbonScalaUtil {
     dataType == StringType
   }
 
+  /**
+   * Rearrange the column schema with all the sort columns at first. In case of ALTER ADD COLUMNS,
+   * if the newly added column is a sort column it will be at the last. But we expects all the
+   * SORT_COLUMNS always at first
+   *
+   * @param columnSchemas
+   * @return
+   */
+  def reArrangeColumnSchema(columnSchemas: mutable.Buffer[ColumnSchema]): mutable
+  .Buffer[ColumnSchema] = {
+    val newColumnSchemas = mutable.Buffer[ColumnSchema]()
+    newColumnSchemas ++= columnSchemas.filter(columnSchema => columnSchema.isSortColumn)
+    newColumnSchemas ++= columnSchemas.filterNot(columnSchema => columnSchema.isSortColumn)
+    newColumnSchemas
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e1dd0af..e051456 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -301,6 +301,7 @@ class AlterTableColumnSchemaGenerator(
     val columnValidator = CarbonSparkFactory.getCarbonColumnValidator
     columnValidator.validateColumns(allColumns)
 
+    allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)
 
     def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
         columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 29cc6a9..6905aa2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -636,8 +636,9 @@ case class CarbonLoadDataCommand(
     CarbonSession.threadSet("partition.operationcontext", operationContext)
     // input data from csv files. Convert to logical plan
     val allCols = new ArrayBuffer[String]()
-    allCols ++= table.getAllDimensions.asScala.map(_.getColName)
-    allCols ++= table.getAllMeasures.asScala.map(_.getColName)
+    // get only the visible dimensions from table
+    allCols ++= table.getDimensionByTableName(table.getTableName).asScala.map(_.getColName)
+    allCols ++= table.getMeasureByTableName(table.getTableName).asScala.map(_.getColName)
     var attributes =
       StructType(
         allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e570360/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index e146f12..054a996 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -45,6 +45,8 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
     sql("drop table if exists test")
     sql("drop table if exists retructure_iud")
     sql("drop table if exists restructure_random_select")
+    sql("drop table if exists alterTable")
+    sql("drop table if exists alterPartitionTable")
 
     // clean data folder
     CarbonProperties.getInstance()
@@ -738,6 +740,42 @@ test("test alter command for boolean data type with correct default measure valu
     }
   }
 
+  test("load table after alter drop column scenario") {
+    sql("drop table if exists alterTable")
+    sql(
+      "create table alterTable(empno string, salary string) stored by 'carbondata' tblproperties" +
+      "('sort_columns'='')")
+    sql("alter table alterTable drop columns(empno)")
+    sql("alter table alterTable add columns(empno string)")
+    sql(s"load data local inpath '$resourcesPath/double.csv' into table alterTable options" +
+        s"('header'='true')")
+    checkAnswer(sql("select salary from alterTable limit 1"), Row(" 775678765456789098765432.789"))
+  }
+
+  test("load partition table after alter drop column scenario") {
+    val timestampFormat = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql("drop table if exists alterPartitionTable")
+    sql(
+      """
+        | CREATE TABLE alterPartitionTable (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='empname,deptno,projectcode,projectjoindate,
+        | projectenddate,attendance')
+      """.stripMargin)
+    sql("alter table alterPartitionTable drop columns(projectenddate)")
+    sql("alter table alterPartitionTable add columns(projectenddate timestamp)")
+    sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alterPartitionTable OPTIONS('DELIMITER'= ',', " +
+              "'QUOTECHAR'= '\"')")
+    sql("select * from alterPartitionTable where empname='bill'").show(false)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS restructure")
     sql("drop table if exists table1")
@@ -756,5 +794,7 @@ test("test alter command for boolean data type with correct default measure valu
     sql("drop table if exists test")
     sql("drop table if exists retructure_iud")
     sql("drop table if exists restructure_random_select")
+    sql("drop table if exists alterTable")
+    sql("drop table if exists alterPartitionTable")
   }
 }