You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/13 14:25:19 UTC
[carbondata] branch master updated: [CARBONDATA-3817]Fix table
creation with all columns as partition columns
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c726af2 [CARBONDATA-3817]Fix table creation with all columns as partition columns
c726af2 is described below
commit c726af2efadc7108886c8bd0ef61bfaeb2a0fc44
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon May 11 18:44:13 2020 +0530
[CARBONDATA-3817]Fix table creation with all columns as partition columns
Why is this PR needed?
When all the columns are given as partition columns during create table,
create table should fail as a minimum one column should be present as a
non-partition column. This is because after #3574 , we improved the create
data source table and we call CreateDataSourceTableCommand of spark.
Since we are creating as Hive table, if while creating hive compatible way,
if it fails, then it will fall back to save its metadata in the Spark SQL
specific way, so partition validation fails when we try to store in hive
compatible way, so in retry, it will pass which is wrong behavior for hive compatible table.
in Hive integration location do not have file system URI prepared for it
What changes were proposed in this PR?
For partition table, if all the columns are present as partition columns,
then validate with the same API which spark does. append file system URI
for location parameter while inferring schema.
This closes #3762
---
.../org/apache/carbondata/hive/CarbonHiveSerDe.java | 4 +++-
.../table/CarbonCreateDataSourceTableCommand.scala | 10 ++++++++++
.../StandardPartitionTableLoadingTestCase.scala | 17 +++++++++++++++--
3 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 99b59b3..b955655 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.hadoop.conf.Configuration;
@@ -115,7 +116,8 @@ public class CarbonHiveSerDe extends AbstractSerDe {
private void inferSchema(Properties tbl, List<String> columnNames, List<TypeInfo> columnTypes) {
if (columnNames.size() == 0 && columnTypes.size() == 0) {
String external = tbl.getProperty("EXTERNAL");
- String location = tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+ String location = CarbonUtil.checkAndAppendFileSystemURIScheme(
+ tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
if (external != null && "TRUE".equals(external) && location != null) {
String[] names =
tbl.getProperty(hive_metastoreConstants.META_TABLE_NAME).split("\\.");
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
index c94835f..4628978 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, Sp
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -66,7 +67,16 @@ case class CarbonCreateDataSourceTableCommand(
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val (tableInfo, catalogTable) = CarbonSource.createTableMeta(sparkSession, table, metaStore)
+ // Since we are creating as Hive table, if while creating hive compatible way, if it fails,then
+ // it will fall back to save its metadata in the Spark SQL specific way, so partition validation
+ // fails when we try to store in hive compatible way, so in retry it might pass, so doing the
+ // partition validation here only.
+ // Refer: org.apache.spark.sql.hive.HiveExternalCatalog.scala#createDataSourceTable
+ val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ PartitioningUtils.validatePartitionColumn(catalogTable.schema,
+ catalogTable.partitionColumnNames,
+ caseSensitiveAnalysis)
val rows = try {
CreateDataSourceTableCommand(
catalogTable,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 796dd50..e23d375 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.spark.testsuite.standardpartition
import java.io.{File, FileWriter, IOException}
import java.util
-import java.util.concurrent.{Callable, Executors, ExecutorService}
+import java.util.concurrent.{Callable, ExecutorService, Executors}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.Strings
@@ -553,6 +553,19 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
Row("empno=99/empname=ravi/designation=xx")))
}
+ test("test create partition table with all the columns as partition columns") {
+ sql("drop table if exists partitionall_columns")
+ val ex = intercept[AnalysisException] {
+ sql(
+ """
+ | CREATE TABLE partitionall_columns
+ | PARTITIONED BY (empno int,empname String, designation String)
+ | STORED AS carbondata
+ """.stripMargin)
+ }
+ assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for partition columns;"))
+ }
+
def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
sql(s"drop table if exists $tableName")
sql(