You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/13 14:25:19 UTC

[carbondata] branch master updated: [CARBONDATA-3817]Fix table creation with all columns as partition columns

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c726af2  [CARBONDATA-3817]Fix table creation with all columns as partition columns
c726af2 is described below

commit c726af2efadc7108886c8bd0ef61bfaeb2a0fc44
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon May 11 18:44:13 2020 +0530

    [CARBONDATA-3817]Fix table creation with all columns as partition columns
    
    Why is this PR needed?
    When all the columns are given as partition columns during create table,
    create table should fail as a minimum one column should be present as a
    non-partition column. This is because after #3574 , we improved the create
    data source table and we call CreateDataSourceTableCommand of spark.
    Since we are creating as Hive table, if while creating hive compatible way,
    if it fails, then it will fall back to save its metadata in the Spark SQL
    specific way, so partition validation fails when we try to store in hive
    compatible way, so in retry, it will pass which is wrong behavior for hive compatible table.
    in Hive integration location do not have file system URI prepared for it
    
    What changes were proposed in this PR?
    For partition table, if all the columns are present as partition columns,
    then validate with the same API which spark does. append file system URI
    for location parameter while inferring schema.
    
    This closes #3762
---
 .../org/apache/carbondata/hive/CarbonHiveSerDe.java     |  4 +++-
 .../table/CarbonCreateDataSourceTableCommand.scala      | 10 ++++++++++
 .../StandardPartitionTableLoadingTestCase.scala         | 17 +++++++++++++++--
 3 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 99b59b3..b955655 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.conf.Configuration;
@@ -115,7 +116,8 @@ public class CarbonHiveSerDe extends AbstractSerDe {
   private void inferSchema(Properties tbl, List<String> columnNames, List<TypeInfo> columnTypes) {
     if (columnNames.size() == 0 && columnTypes.size() == 0) {
       String external = tbl.getProperty("EXTERNAL");
-      String location = tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+      String location = CarbonUtil.checkAndAppendFileSystemURIScheme(
+          tbl.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
       if (external != null && "TRUE".equals(external) && location != null) {
         String[] names =
             tbl.getProperty(hive_metastoreConstants.META_TABLE_NAME).split("\\.");
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
index c94835f..4628978 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, Sp
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 
@@ -66,7 +67,16 @@ case class CarbonCreateDataSourceTableCommand(
 
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val (tableInfo, catalogTable) = CarbonSource.createTableMeta(sparkSession, table, metaStore)
+    // Since we are creating as Hive table, if while creating hive compatible way, if it fails,then
+    // it will fall back to save its metadata in the Spark SQL specific way, so partition validation
+    // fails when we try to store in hive compatible way, so in retry it might pass, so doing the
+    // partition validation here only.
+    // Refer: org.apache.spark.sql.hive.HiveExternalCatalog.scala#createDataSourceTable
 
+    val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.validatePartitionColumn(catalogTable.schema,
+      catalogTable.partitionColumnNames,
+      caseSensitiveAnalysis)
     val rows = try {
       CreateDataSourceTableCommand(
         catalogTable,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 796dd50..e23d375 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.spark.testsuite.standardpartition
 
 import java.io.{File, FileWriter, IOException}
 import java.util
-import java.util.concurrent.{Callable, Executors, ExecutorService}
+import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.Strings
@@ -553,6 +553,19 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       Row("empno=99/empname=ravi/designation=xx")))
   }
 
+  test("test create partition table with all the columns as partition columns") {
+    sql("drop table if exists partitionall_columns")
+    val ex = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE TABLE partitionall_columns
+          | PARTITIONED BY (empno int,empname String, designation String)
+          | STORED AS carbondata
+      """.stripMargin)
+    }
+    assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for partition columns;"))
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
     sql(s"drop table if exists $tableName")
     sql(