You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/30 16:34:04 UTC

[11/26] carbondata git commit: [CARBONDATA-3101] Fixed dataload failure when a column is dropped and added in partition table

[CARBONDATA-3101] Fixed dataload failure when a column is dropped and added in partition table

Problem: Data load is failing when a column in partition table is dropped and recreated because fieldConverters are created based on schemaOrdinal and this may be different than the order in which data is read from the csv.
For example --> columns = nonparcolumn string, parcolumn int
Now if the user drops and adds column the nonparcolumn then the fieldConverters would be created in the following order
fieldConverters[0] = MeasureFieldConverter
fieldConverters[1] = NonDictionaryFieldConverterImpl

The data read from the csv would be 'columndata', 1. The conversion for nonPartition column would fail i this case.

Solution: Add the partition column to the last while doing alter add column and creating fieldConverters.

This closes #2923


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/07c283d5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/07c283d5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/07c283d5

Branch: refs/heads/branch-1.5
Commit: 07c283d594a964898a28b6c9c5619fe55b30834a
Parents: 4043b10
Author: kunal642 <ku...@gmail.com>
Authored: Thu Nov 15 10:10:00 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 30 21:55:29 2018 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableQueryTestCase.scala        | 15 +++++++++++++++
 .../execution/command/carbonTableSchemaCommon.scala  |  5 +++++
 .../org/apache/spark/sql/hive/CarbonRelation.scala   |  9 ++++++++-
 .../loading/model/CarbonLoadModelBuilder.java        | 15 +++++++++++----
 4 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/07c283d5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index eaf5e4b..8107cd5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -437,6 +437,20 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop datamap if exists preaggTable on table partitionTable")
   }
 
+  test("validate data in partition table after dropping and adding a column") {
+    sql("drop table if exists par")
+    sql("create table par(name string) partitioned by (age double) stored by " +
+              "'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
+        s"('header'='false')")
+    sql("alter table par drop columns(name)")
+    sql("alter table par add columns(name string)")
+    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
+        s"('header'='false')")
+    checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), Row(null)))
+    sql("drop table if exists par")
+  }
+
 
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
@@ -473,6 +487,7 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists staticpartitionlocloadother")
     sql("drop table if exists staticpartitionextlocload_new")
     sql("drop table if exists staticpartitionlocloadother_new")
+    sql("drop table if exists par")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07c283d5/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index f5149e8..745eee7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -302,6 +302,11 @@ class AlterTableColumnSchemaGenerator(
 
     allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)
 
+    if (tableInfo.getFactTable.getPartitionInfo != null) {
+      val par = tableInfo.getFactTable.getPartitionInfo.getColumnSchemaList
+      allColumns = allColumns.filterNot(b => par.contains(b)) ++= par.asScala
+    }
+
     def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
         columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable
     .ListBuffer[ColumnSchema], scala.collection.mutable.ListBuffer[ColumnSchema]) = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07c283d5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 0c3b7dc..ad3eb72 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -98,8 +98,15 @@ case class CarbonRelation(
   override val output = {
     val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
       .asScala
+    val partitionColumnSchemas = if (carbonTable.getPartitionInfo() != null) {
+      carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+    } else {
+      Nil
+    }
+    val otherColumns = columns.filterNot(a => partitionColumnSchemas.contains(a.getColumnSchema))
+    val partitionColumns = columns.filter(a => partitionColumnSchemas.contains(a.getColumnSchema))
     // convert each column to Attribute
-    columns.filter(!_.isInvisible).map { column: CarbonColumn =>
+    (otherColumns ++= partitionColumns).filter(!_.isInvisible).map { column: CarbonColumn =>
       if (column.isDimension()) {
         val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07c283d5/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index f69f0af..4a29304 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -206,11 +206,18 @@ public class CarbonLoadModelBuilder {
       } else {
         if (StringUtils.isEmpty(fileHeader)) {
           List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
-          String[] columnNames = new String[columns.size()];
-          for (int i = 0; i < columnNames.length; i++) {
-            columnNames[i] = columns.get(i).getColName();
+          List<String> columnNames = new ArrayList<>();
+          List<String> partitionColumns = new ArrayList<>();
+          for (int i = 0; i < columns.size(); i++) {
+            if (table.getPartitionInfo() != null && table.getPartitionInfo().getColumnSchemaList()
+                .contains(columns.get(i).getColumnSchema())) {
+              partitionColumns.add(columns.get(i).getColName());
+            } else {
+              columnNames.add(columns.get(i).getColName());
+            }
           }
-          fileHeader = Strings.mkString(columnNames, ",");
+          columnNames.addAll(partitionColumns);
+          fileHeader = Strings.mkString(columnNames.toArray(new String[columnNames.size()]), ",");
         }
       }
     }