You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/22 12:51:46 UTC

carbondata git commit: [CARBONDATA-3101] Fixed dataload failure when a column is dropped and added in partition table

Repository: carbondata
Updated Branches:
  refs/heads/master 79fc97a8d -> c5de10cb1


[CARBONDATA-3101] Fixed dataload failure when a column is dropped and added in partition table

Problem: Data load is failing when a column in partition table is dropped and recreated because fieldConverters are created based on schemaOrdinal and this may be different than the order in which data is read from the csv.
For example --> columns = nonparcolumn string, parcolumn int
Now if the user drops and adds column the nonparcolumn then the fieldConverters would be created in the following order
fieldConverters[0] = MeasureFieldConverter
fieldConverters[1] = NonDictionaryFieldConverterImpl

The data read from the csv would be 'columndata', 1. The conversion for nonPartition column would fail i this case.

Solution: Add the partition column to the last while doing alter add column and creating fieldConverters.

This closes #2923


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c5de10cb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c5de10cb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c5de10cb

Branch: refs/heads/master
Commit: c5de10cb14495ced13af53c7a30dcaf805bf3b4f
Parents: 79fc97a
Author: kunal642 <ku...@gmail.com>
Authored: Thu Nov 15 10:10:00 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Nov 22 18:21:37 2018 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableQueryTestCase.scala        | 15 +++++++++++++++
 .../execution/command/carbonTableSchemaCommon.scala  |  5 +++++
 .../org/apache/spark/sql/hive/CarbonRelation.scala   |  9 ++++++++-
 .../loading/model/CarbonLoadModelBuilder.java        | 15 +++++++++++----
 4 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5de10cb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index eaf5e4b..8107cd5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -437,6 +437,20 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop datamap if exists preaggTable on table partitionTable")
   }
 
+  test("validate data in partition table after dropping and adding a column") {
+    sql("drop table if exists par")
+    sql("create table par(name string) partitioned by (age double) stored by " +
+              "'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
+        s"('header'='false')")
+    sql("alter table par drop columns(name)")
+    sql("alter table par add columns(name string)")
+    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
+        s"('header'='false')")
+    checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), Row(null)))
+    sql("drop table if exists par")
+  }
+
 
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
@@ -473,6 +487,7 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists staticpartitionlocloadother")
     sql("drop table if exists staticpartitionextlocload_new")
     sql("drop table if exists staticpartitionlocloadother_new")
+    sql("drop table if exists par")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5de10cb/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index f5149e8..745eee7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -302,6 +302,11 @@ class AlterTableColumnSchemaGenerator(
 
     allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)
 
+    if (tableInfo.getFactTable.getPartitionInfo != null) {
+      val par = tableInfo.getFactTable.getPartitionInfo.getColumnSchemaList
+      allColumns = allColumns.filterNot(b => par.contains(b)) ++= par.asScala
+    }
+
     def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
         columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable
     .ListBuffer[ColumnSchema], scala.collection.mutable.ListBuffer[ColumnSchema]) = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5de10cb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 0c3b7dc..ad3eb72 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -98,8 +98,15 @@ case class CarbonRelation(
   override val output = {
     val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
       .asScala
+    val partitionColumnSchemas = if (carbonTable.getPartitionInfo() != null) {
+      carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+    } else {
+      Nil
+    }
+    val otherColumns = columns.filterNot(a => partitionColumnSchemas.contains(a.getColumnSchema))
+    val partitionColumns = columns.filter(a => partitionColumnSchemas.contains(a.getColumnSchema))
     // convert each column to Attribute
-    columns.filter(!_.isInvisible).map { column: CarbonColumn =>
+    (otherColumns ++= partitionColumns).filter(!_.isInvisible).map { column: CarbonColumn =>
       if (column.isDimension()) {
         val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5de10cb/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index f69f0af..4a29304 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -206,11 +206,18 @@ public class CarbonLoadModelBuilder {
       } else {
         if (StringUtils.isEmpty(fileHeader)) {
           List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
-          String[] columnNames = new String[columns.size()];
-          for (int i = 0; i < columnNames.length; i++) {
-            columnNames[i] = columns.get(i).getColName();
+          List<String> columnNames = new ArrayList<>();
+          List<String> partitionColumns = new ArrayList<>();
+          for (int i = 0; i < columns.size(); i++) {
+            if (table.getPartitionInfo() != null && table.getPartitionInfo().getColumnSchemaList()
+                .contains(columns.get(i).getColumnSchema())) {
+              partitionColumns.add(columns.get(i).getColName());
+            } else {
+              columnNames.add(columns.get(i).getColName());
+            }
           }
-          fileHeader = Strings.mkString(columnNames, ",");
+          columnNames.addAll(partitionColumns);
+          fileHeader = Strings.mkString(columnNames.toArray(new String[columnNames.size()]), ",");
         }
       }
     }