You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/09/21 16:25:42 UTC
carbondata git commit: [CARBONDATA-2950]alter add column of hive
table fails from carbon for spark versions above 2.1
Repository: carbondata
Updated Branches:
refs/heads/master f962e41b7 -> 8320918e5
[CARBONDATA-2950]alter add column of hive table fails from carbon for spark versions above 2.1
Problem:
spark does not support add columns in spark-2.1, but it is supported in 2.2 and above
when add column is fired for hive table in carbon session, for spark -version above 2.1, it throws error as unsupported operation on hive table
Solution:
when alter add columns for hive is fired for spark-2.2 and above, it should not throw any exception and it should pass
This closes #2735
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8320918e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8320918e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8320918e
Branch: refs/heads/master
Commit: 8320918e55b393fedc946e4543843a72712d9199
Parents: f962e41
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Sep 19 19:51:39 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Sep 21 21:55:06 2018 +0530
----------------------------------------------------------------------
.../sdv/generated/AlterTableTestCase.scala | 18 ++++++++++++-
.../lucene/LuceneFineGrainDataMapSuite.scala | 27 --------------------
.../org/apache/carbondata/spark/util/Util.java | 2 +-
.../spark/util/CarbonReflectionUtils.scala | 15 +++++++++++
.../sql/execution/strategy/DDLStrategy.scala | 21 +++++++++++++--
5 files changed, 52 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 4e53ea3..90fa602 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -18,12 +18,14 @@
package org.apache.carbondata.cluster.sdv.generated
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util._
-import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -1000,6 +1002,20 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""drop table if exists uniqdata59""").collect
}
+ test("Alter table add column for hive table for spark version above 2.1") {
+ sql("drop table if exists alter_hive")
+ sql("create table alter_hive(name string)")
+ if(SPARK_VERSION.startsWith("2.1")) {
+ val exception = intercept[MalformedCarbonCommandException] {
+ sql("alter table alter_hive add columns(add string)")
+ }
+ assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ sql("alter table alter_hive add columns(add string)")
+ sql("insert into alter_hive select 'abc','banglore'")
+ }
+ }
+
val prop = CarbonProperties.getInstance()
val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)
val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 0c6134b..2e3019a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -415,33 +415,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS datamap_test_table")
}
- test("test lucene fine grain data map with ALTER ADD and DROP Table COLUMN on Lucene DataMap") {
- sql("DROP TABLE IF EXISTS datamap_test_table")
- sql(
- """
- | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
- | STORED BY 'carbondata'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(
- s"""
- | CREATE DATAMAP dm2 ON TABLE datamap_test_table
- | USING 'lucene'
- | DMProperties('INDEX_COLUMNS'='name , city')
- """.stripMargin)
- val exception_add_column: Exception = intercept[MalformedCarbonCommandException] {
- sql("alter table dm2 add columns(city1 string)")
- }
- assert(exception_add_column.getMessage
- .contains("Unsupported alter operation on hive table"))
- val exception_drop_column: Exception = intercept[MalformedCarbonCommandException] {
- sql("alter table dm2 drop columns(name)")
- }
- assert(exception_drop_column.getMessage
- .contains("Unsupported alter operation on hive table"))
- sql("drop datamap if exists dm2 on table datamap_test_table")
- }
-
test("test Clean Files and check Lucene DataMap") {
sql("DROP TABLE IF EXISTS datamap_test_table")
sql(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 832a1b2..d1193f5 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -59,7 +59,7 @@ public class Util {
return false;
}
- private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+ public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
DataType carbonDataType) {
if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
return DataTypes.StringType;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 32cd201..9955286 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -301,6 +303,19 @@ object CarbonReflectionUtils {
}
}
+ /**
+ * method to invoke alter table add columns for hive table from carbon session
+ * @param table
+ * @param colsToAdd
+ * @return
+ */
+ def invokeAlterTableAddColumn(table: TableIdentifier,
+ colsToAdd: Seq[StructField]): Object = {
+ val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
+ CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
+ ._1.asInstanceOf[RunnableCommand]
+ }
+
def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 4499b19..f9046f0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,12 +32,14 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTa
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.Util
/**
* Carbon strategies for ddl commands
@@ -152,6 +154,21 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
ExecutedCommandExec(addColumn) :: Nil
}
+ // TODO: remove this else if check once the 2.1 version is unsupported by carbon
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
+ .map {
+ a =>
+ StructField(a.column,
+ Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
+ }
+ val identifier = TableIdentifier(
+ alterTableAddColumnsModel.tableName,
+ alterTableAddColumnsModel.databaseName)
+ ExecutedCommandExec(CarbonReflectionUtils
+ .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
+ Nil
+ // TODO: remove this else check once the 2.1 version is unsupported by carbon
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}