You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/06 01:59:06 UTC

[flink] 01/02: [FLINK-27217][hive] Support partition filter push down when there exists default partition for Hive source.

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c54eaf51aad662dcfd187f370792edf15bb80b3c
Author: luoyuxia <lu...@alibaba-inc.com>
AuthorDate: Wed Apr 13 15:11:36 2022 +0800

    [FLINK-27217][hive] Support partition filter push down when there exists default partition for Hive source.
    
    This closes #19450
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 40 ++++++++++++++++++----
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  5 ++-
 .../flink/connectors/hive/HiveDialectITCase.java   | 21 ++++++++++++
 .../connectors/hive/HiveTableSourceITCase.java     | 16 +++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  6 +++-
 .../table/operations/ShowPartitionsOperation.java  | 17 +++++++++
 6 files changed, 96 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 093906c698c..c72ebbe8edc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -909,7 +909,14 @@ public class HiveCatalog extends AbstractCatalog {
                     .listPartitionNames(
                             tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1)
                     .stream()
-                    .map(HiveCatalog::createPartitionSpec)
+                    .map(
+                            p ->
+                                    createPartitionSpec(
+                                            p,
+                                            getHiveConf()
+                                                    .getVar(
+                                                            HiveConf.ConfVars
+                                                                    .DEFAULTPARTITIONNAME)))
                     .collect(Collectors.toList());
         } catch (TException e) {
             throw new CatalogException(
@@ -945,7 +952,14 @@ public class HiveCatalog extends AbstractCatalog {
                             partialVals,
                             (short) -1)
                     .stream()
-                    .map(HiveCatalog::createPartitionSpec)
+                    .map(
+                            p ->
+                                    createPartitionSpec(
+                                            p,
+                                            getHiveConf()
+                                                    .getVar(
+                                                            HiveConf.ConfVars
+                                                                    .DEFAULTPARTITIONNAME)))
                     .collect(Collectors.toList());
         } catch (TException e) {
             throw new CatalogException(
@@ -1139,21 +1153,28 @@ public class HiveCatalog extends AbstractCatalog {
 
     /**
      * Creates a {@link CatalogPartitionSpec} from a Hive partition name string. Example of Hive
-     * partition name string - "name=bob/year=2019"
+     * partition name string - "name=bob/year=2019". If the partition name for the given partition
+     * column is equal to {@param defaultPartitionName}, the partition value in returned {@link
+     * CatalogPartitionSpec} will be null.
      */
-    private static CatalogPartitionSpec createPartitionSpec(String hivePartitionName) {
+    private static CatalogPartitionSpec createPartitionSpec(
+            String hivePartitionName, String defaultPartitionName) {
         String[] partKeyVals = hivePartitionName.split("/");
         Map<String, String> spec = new HashMap<>(partKeyVals.length);
         for (String keyVal : partKeyVals) {
             String[] kv = keyVal.split("=");
-            spec.put(unescapePathName(kv[0]), unescapePathName(kv[1]));
+            String partitionValue = unescapePathName(kv[1]);
+            spec.put(
+                    unescapePathName(kv[0]),
+                    partitionValue.equals(defaultPartitionName) ? null : partitionValue);
         }
         return new CatalogPartitionSpec(spec);
     }
 
     /**
      * Get a list of ordered partition values by re-arranging them based on the given list of
-     * partition keys.
+     * partition keys. If the partition value is null, it'll be converted into default partition
+     * name.
      *
      * @param partitionSpec a partition spec.
      * @param partitionKeys a list of partition keys.
@@ -1177,7 +1198,11 @@ public class HiveCatalog extends AbstractCatalog {
                 throw new PartitionSpecInvalidException(
                         getName(), partitionKeys, tablePath, partitionSpec);
             } else {
-                values.add(spec.get(key));
+                String value = spec.get(key);
+                if (value == null) {
+                    value = getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+                }
+                values.add(value);
             }
         }
 
@@ -1803,6 +1828,7 @@ public class HiveCatalog extends AbstractCatalog {
                                 listPartitions(
                                         new ObjectPath(
                                                 hiveTable.getDbName(), hiveTable.getTableName()))) {
+
                             Partition partition = getHivePartition(hiveTable, spec);
                             HiveTableUtil.alterColumns(partition.getSd(), catalogTable);
                             client.alter_partition(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 9874da8faf0..6730d5cde76 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -1738,7 +1738,10 @@ public class HiveParserDDLSemanticAnalyzer {
         if (partSpec != null && !partSpec.isEmpty()) {
             spec = new CatalogPartitionSpec(new HashMap<>(partSpec));
         }
-        return new ShowPartitionsOperation(tableIdentifier, spec);
+        return new ShowPartitionsOperation(
+                tableIdentifier,
+                spec,
+                HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
     }
 
     private Operation convertShowDatabases() {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index e7f21e6de8b..f3975885f23 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -971,6 +971,27 @@ public class HiveDialectITCase {
                                 .collect());
         assertThat(partitions).hasSize(1);
         assertThat(partitions.toString()).contains("dt=2020-04-30 01:02:03/country=china");
+        // show partitions for the table containing default partition
+        tableEnv.executeSql("create table tb1 (a string) partitioned by (c int)");
+        tableEnv.executeSql(
+                        "INSERT OVERWRITE TABLE tb1 PARTITION (c) values ('Col1', null), ('Col1', 5)")
+                .await();
+        String defaultPartitionName =
+                hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+
+        partitions =
+                CollectionUtil.iteratorToList(tableEnv.executeSql("show partitions tb1").collect());
+        assertThat(partitions.toString())
+                .isEqualTo(String.format("[+I[c=5], +I[c=%s]]", defaultPartitionName));
+        partitions =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql(
+                                        String.format(
+                                                "show partitions tb1 partition (c='%s')",
+                                                defaultPartitionName))
+                                .collect());
+        assertThat(partitions.toString())
+                .isEqualTo(String.format("[+I[c=%s]]", defaultPartitionName));
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 1b1e06834dc..dc98c4743f0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -218,6 +218,22 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         assertThat(rows).hasSize(2);
         Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
         assertThat(rowStrings).isEqualTo(new String[] {"+I[2014, 3, 0]", "+I[2014, 4, 0]"});
+
+        // test the case that prune partition with reading partition from catalog without filter and
+        // there exists default partition
+        // insert null value for the partition column which will fall into the default partition
+        batchTableEnv
+                .executeSql(
+                        "insert into source_db.test_table_pt_1 values ('2014', 1, null), ('2015', 2, null)")
+                .await();
+        // currently, the expression "is null" is supported HiveCatalog#listPartitionsByFilter,
+        // then the planer will list all partitions and then prue the partitions.
+        // the test is to cover such case
+        src =
+                batchTableEnv.sqlQuery(
+                        "select * from hive.source_db.test_table_pt_1 where pt is null");
+        rows = CollectionUtil.iteratorToList(src.execute().collect());
+        assertThat(rows.toString()).isEqualTo("[+I[2014, 1, null], +I[2015, 2, null]]");
     }
 
     @Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index df95c584654..7f1af0fcb8c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1298,7 +1298,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                     List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size());
                     for (Map.Entry<String, String> partitionKV :
                             spec.getPartitionSpec().entrySet()) {
-                        partitionKVs.add(partitionKV.getKey() + "=" + partitionKV.getValue());
+                        String partitionValue =
+                                partitionKV.getValue() == null
+                                        ? showPartitionsOperation.getDefaultPartitionName()
+                                        : partitionKV.getValue();
+                        partitionKVs.add(partitionKV.getKey() + "=" + partitionValue);
                     }
                     partitionNames.add(String.join("/", partitionKVs));
                 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
index c1e1c18d138..08a3ca7ea18 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
@@ -21,16 +21,29 @@ package org.apache.flink.table.operations;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import javax.annotation.Nullable;
+
 /** Operation to describe a SHOW PARTITIONS statement. */
 public class ShowPartitionsOperation implements ShowOperation {
 
     protected final ObjectIdentifier tableIdentifier;
     private final CatalogPartitionSpec partitionSpec;
+    // the name for the default partition, which usually means the partition's value is null or
+    // empty string
+    @Nullable private final String defaultPartitionName;
 
     public ShowPartitionsOperation(
             ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+        this(tableIdentifier, partitionSpec, null);
+    }
+
+    public ShowPartitionsOperation(
+            ObjectIdentifier tableIdentifier,
+            CatalogPartitionSpec partitionSpec,
+            @Nullable String defaultPartitionName) {
         this.tableIdentifier = tableIdentifier;
         this.partitionSpec = partitionSpec;
+        this.defaultPartitionName = defaultPartitionName;
     }
 
     public ObjectIdentifier getTableIdentifier() {
@@ -41,6 +54,10 @@ public class ShowPartitionsOperation implements ShowOperation {
         return partitionSpec;
     }
 
+    public String getDefaultPartitionName() {
+        return defaultPartitionName;
+    }
+
     @Override
     public String asSummaryString() {
         StringBuilder builder =