You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/04/14 01:00:33 UTC

[impala] branch master updated: IMPALA-9732: Improve exceptions of unsupported HdfsTableSink formats

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b16df9  IMPALA-9732: Improve exceptions of unsupported HdfsTableSink formats
6b16df9 is described below

commit 6b16df9e9a4696b46b6f9c7fe2fc0aaded285623
Author: Tamas Mate <tm...@cloudera.com>
AuthorDate: Sat Apr 10 11:30:29 2021 +0200

    IMPALA-9732: Improve exceptions of unsupported HdfsTableSink formats
    
    This change updates the exception that is thrown when the user tries to
    insert into a partition which has unsupported format. The information to
    make this decision is available during analysis, therefore this commit
    also moves the check from the planner to the analyzer to have an
    earlier result.
    
    In the analyzer only the FeFsTables have to be checked therefore Kudu
    tables are not related. Also, there is a difference between static and
    dynamic partition clauses, for static partition clauses the partition
    format is available during compile, for dynaminc partition clauses it is
    only avaialble during runtime.
    
    Testing:
     - Added unit tests
     - Ran exhaustive tests successfully
    
    Change-Id: I7fa2f949336a422acb4d01c9347b9b2e808e4aec
    Reviewed-on: http://gerrit.cloudera.org:8080/17300
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/InsertStmt.java     | 59 +++++++++++++++++-----
 .../org/apache/impala/planner/HdfsTableSink.java   |  8 +--
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 11 ++++
 .../functional/functional_schema_template.sql      | 17 +++++++
 4 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index a6a83d1..8c94aea 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -27,20 +27,25 @@ import org.apache.iceberg.types.Types;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.BuiltinsDb;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.KuduColumn;
+import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.HdfsTableSink;
 import org.apache.impala.planner.TableSink;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
@@ -50,6 +55,7 @@ import org.apache.impala.util.IcebergUtil;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 /**
  * Representation of a single insert or upsert statement, including the select statement
@@ -297,6 +303,13 @@ public class InsertStmt extends StatementBase {
       selectListExprs = new ArrayList<>();
     }
 
+    // Make sure static partition key values only contain const exprs.
+    if (partitionKeyValues_ != null) {
+      for (PartitionKeyValue kv: partitionKeyValues_) {
+        kv.analyze(analyzer);
+      }
+    }
+
     // Set target table and perform table-type specific analysis and auth checking.
     // Also checks if the target table is missing.
     analyzeTargetTable(analyzer);
@@ -389,13 +402,6 @@ public class InsertStmt extends StatementBase {
     checkColumnCoverage(selectExprTargetColumns, mentionedColumnNames,
         selectListExprs.size(), numStaticPartitionExprs);
 
-    // Make sure static partition key values only contain const exprs.
-    if (partitionKeyValues_ != null) {
-      for (PartitionKeyValue kv: partitionKeyValues_) {
-        kv.analyze(analyzer);
-      }
-    }
-
     // Check that we can write to the target table/partition. This must be
     // done after the partition expression has been analyzed above.
     analyzeWriteAccess();
@@ -547,6 +553,33 @@ public class InsertStmt extends StatementBase {
               targetTableName_));
         }
       }
+      // Check if the target partition is supported to write. For static partitioning the
+      // file format is available on partition level, however for dynamic partitioning
+      // the target partition formats are unknown during analysis and planning, therefore
+      // it will be verified based on the table metadata.
+      if (isStaticPartitionTarget()) {
+        PrunablePartition partition =
+            HdfsTable.getPartition(fsTable, partitionKeyValues_);
+        if (partition != null && partition instanceof FeFsPartition) {
+          HdfsFileFormat fileFormat = ((FeFsPartition) partition).getFileFormat();
+          Boolean notSupported =
+              !HdfsTableSink.SUPPORTED_FILE_FORMATS.contains(fileFormat);
+          if (notSupported) {
+            throw new AnalysisException(String.format("Writing the destination " +
+                "partition format '" + fileFormat + "' is not supported."));
+          }
+        }
+      } else {
+        Set<HdfsFileFormat> formats = fsTable.getFileFormats();
+        Set<HdfsFileFormat> unsupportedFormats =
+            Sets.difference(formats, HdfsTableSink.SUPPORTED_FILE_FORMATS);
+        if (!unsupportedFormats.isEmpty()) {
+          throw new AnalysisException(String.format("Destination table '" +
+              fsTable.getFullName() + "' contains partition format(s) that are not " +
+              "supported to write: '" + Joiner.on(',').join(unsupportedFormats) + "', " +
+              "dynamic partition clauses are forbidden."));
+        }
+      }
     }
 
     if (table_ instanceof FeKuduTable) {
@@ -600,15 +633,17 @@ public class InsertStmt extends StatementBase {
     if (!(table_ instanceof FeFsTable)) return;
     FeFsTable fsTable = (FeFsTable) table_;
 
+    // If the partition target is fully static, then check for write access against
+    // the specific partition. Otherwise, check the whole table.
     FeFsTable.Utils.checkWriteAccess(fsTable,
-        hasStaticPartitionTarget() ? partitionKeyValues_ : null, "INSERT");
+        isStaticPartitionTarget() ? partitionKeyValues_ : null, "INSERT");
   }
 
-  private boolean hasStaticPartitionTarget() {
+  /**
+   * Returns true if all the partition key values of the target table are static.
+   */
+  private boolean isStaticPartitionTarget() {
     if (partitionKeyValues_ == null) return false;
-
-    // If the partition target is fully static, then check for write access against
-    // the specific partition. Otherwise, check the whole table.
     for (PartitionKeyValue pkv : partitionKeyValues_) {
       if (pkv.isDynamic()) return false;
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 96a699d..0fc4e96 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -65,7 +65,7 @@ public class HdfsTableSink extends TableSink {
   // Indicates that this sink is being used to write query results
   protected final boolean isResultSink_;
 
-  private static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
+  public static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
       HdfsFileFormat.PARQUET, HdfsFileFormat.TEXT, HdfsFileFormat.RC_FILE,
       HdfsFileFormat.SEQUENCE_FILE, HdfsFileFormat.AVRO, HdfsFileFormat.ICEBERG);
 
@@ -170,12 +170,6 @@ public class HdfsTableSink extends TableSink {
    * set of file formats.
    */
   private long getPerPartitionMemReq(Set<HdfsFileFormat> formats) {
-    Set<HdfsFileFormat> unsupportedFormats =
-        Sets.difference(formats, SUPPORTED_FILE_FORMATS);
-    if (!unsupportedFormats.isEmpty()) {
-      Preconditions.checkState(false,
-          "Unsupported TableSink format(s): " + Joiner.on(',').join(unsupportedFormats));
-    }
     if (formats.contains(HdfsFileFormat.PARQUET)) {
       // Writing to a Parquet partition requires up to 1GB of buffer. From a resource
       // management purview, even if there are non-Parquet partitions, we want to be
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 402077a..7d6c8d1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4156,6 +4156,12 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, " +
         "float_col, double_col, date_string_col, string_col, timestamp_col, year, " +
         "month from functional.alltypes");
+
+    // Insert with dynamic partitioning clause into a table that has unsupported partition
+    AnalysisError("insert into table functional.multipartformat partition (p) " +
+       "select 'parquet', 1", "Destination table 'functional.multipartformat' " +
+       "contains partition format(s) that are not supported to write: 'ORC', " +
+       "dynamic partition clauses are forbidden.");
   }
 
   /**
@@ -4351,6 +4357,11 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "float_col, double_col, date_string_col, string_col, timestamp_col " +
         "from functional.alltypes");
 
+    // Insert with static partitioning clause into a partition that has unsupported
+    // partition format.
+    AnalysisError("insert into table functional.multipartformat partition (p='orc') " +
+        "select 1", "Writing the destination partition format 'ORC' is not supported.");
+
     if (qualifier.contains("OVERWRITE")) {
       AnalysisError("insert " + qualifier + " table functional_hbase.alltypessmall " +
           "partition(year, month) select * from functional.alltypessmall",
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index c00cd88..6fafd3f 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -837,6 +837,23 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT c.* FROM functio
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+multipartformat
+---- CREATE_HIVE
+-- Used to test dynamic and static insert into partitioned tables which contains
+-- supported and unsupported file formats.
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (id int)
+  PARTITIONED BY (p string);
+---- LOAD
+ALTER TABLE {db_name}{db_suffix}.{table_name} ADD PARTITION (p='parquet');
+ALTER TABLE {db_name}{db_suffix}.{table_name} ADD PARTITION (p='orc');
+ALTER TABLE {db_name}{db_suffix}.{table_name} PARTITION (p='parquet')
+  SET FILEFORMAT PARQUET;
+ALTER TABLE {db_name}{db_suffix}.{table_name} PARTITION (p='orc')
+  SET FILEFORMAT ORC;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 complextypes_fileformat
 ---- CREATE_HIVE
 -- Used for positive/negative testing of complex types on various file formats.