You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/09/21 16:25:42 UTC

carbondata git commit: [CARBONDATA-2950]alter add column of hive table fails from carbon for spark versions above 2.1

Repository: carbondata
Updated Branches:
  refs/heads/master f962e41b7 -> 8320918e5


[CARBONDATA-2950]alter add column of hive table fails from carbon for spark versions above 2.1

Problem:
spark does not support add columns in spark-2.1, but it is supported in 2.2 and above
when add column is fired for hive table in carbon session, for spark -version above 2.1, it throws error as unsupported operation on hive table

Solution:
when alter add columns for hive is fired for spark-2.2 and above, it should not throw any exception and it should pass

This closes #2735


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8320918e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8320918e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8320918e

Branch: refs/heads/master
Commit: 8320918e55b393fedc946e4543843a72712d9199
Parents: f962e41
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Sep 19 19:51:39 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Sep 21 21:55:06 2018 +0530

----------------------------------------------------------------------
 .../sdv/generated/AlterTableTestCase.scala      | 18 ++++++++++++-
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 27 --------------------
 .../org/apache/carbondata/spark/util/Util.java  |  2 +-
 .../spark/util/CarbonReflectionUtils.scala      | 15 +++++++++++
 .../sql/execution/strategy/DDLStrategy.scala    | 21 +++++++++++++--
 5 files changed, 52 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 4e53ea3..90fa602 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -18,12 +18,14 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util._
-import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -1000,6 +1002,20 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
      sql(s"""drop table  if exists uniqdata59""").collect
   }
 
+  test("Alter table add column for hive table for spark version above 2.1") {
+    sql("drop table if exists alter_hive")
+    sql("create table alter_hive(name string)")
+    if(SPARK_VERSION.startsWith("2.1")) {
+      val exception = intercept[MalformedCarbonCommandException] {
+        sql("alter table alter_hive add columns(add string)")
+      }
+      assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      sql("alter table alter_hive add columns(add string)")
+      sql("insert into alter_hive select 'abc','banglore'")
+    }
+  }
+
   val prop = CarbonProperties.getInstance()
   val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)
   val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 0c6134b..2e3019a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -415,33 +415,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_test_table")
   }
 
-  test("test lucene fine grain data map with ALTER ADD and DROP Table COLUMN on Lucene DataMap") {
-    sql("DROP TABLE IF EXISTS datamap_test_table")
-    sql(
-      """
-        | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'carbondata'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
-      """.stripMargin)
-    sql(
-      s"""
-         | CREATE DATAMAP dm2 ON TABLE datamap_test_table
-         | USING 'lucene'
-         | DMProperties('INDEX_COLUMNS'='name , city')
-      """.stripMargin)
-    val exception_add_column: Exception = intercept[MalformedCarbonCommandException] {
-      sql("alter table dm2 add columns(city1 string)")
-    }
-    assert(exception_add_column.getMessage
-      .contains("Unsupported alter operation on hive table"))
-    val exception_drop_column: Exception = intercept[MalformedCarbonCommandException] {
-      sql("alter table dm2 drop columns(name)")
-    }
-    assert(exception_drop_column.getMessage
-      .contains("Unsupported alter operation on hive table"))
-    sql("drop datamap if exists dm2 on table datamap_test_table")
-  }
-
   test("test Clean Files and check Lucene DataMap") {
     sql("DROP TABLE IF EXISTS datamap_test_table")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 832a1b2..d1193f5 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -59,7 +59,7 @@ public class Util {
     return false;
   }
 
-  private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+  public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
       DataType carbonDataType) {
     if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
       return DataTypes.StringType;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 32cd201..9955286 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructField
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
@@ -301,6 +303,19 @@ object CarbonReflectionUtils {
     }
   }
 
+  /**
+   * method to invoke alter table add columns for hive table from carbon session
+   * @param table
+   * @param colsToAdd
+   * @return
+   */
+  def invokeAlterTableAddColumn(table: TableIdentifier,
+      colsToAdd: Seq[StructField]): Object = {
+    val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
+    CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
+      ._1.asInstanceOf[RunnableCommand]
+  }
+
   def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
     val clazz = Utils.classForName(className)
     val ctor = clazz.getConstructors.head

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 4499b19..f9046f0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,12 +32,14 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTa
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
 import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.Util
 
   /**
    * Carbon strategies for ddl commands
@@ -152,6 +154,21 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           } else {
             ExecutedCommandExec(addColumn) :: Nil
           }
+          // TODO: remove this else if check once the 2.1 version is unsupported by carbon
+        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+          val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
+            .map {
+              a =>
+                StructField(a.column,
+                  Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
+            }
+          val identifier = TableIdentifier(
+            alterTableAddColumnsModel.tableName,
+            alterTableAddColumnsModel.databaseName)
+          ExecutedCommandExec(CarbonReflectionUtils
+            .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
+          Nil
+          // TODO: remove this else check once the 2.1 version is unsupported by carbon
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }