You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/06 01:59:06 UTC
[flink] 01/02: [FLINK-27217][hive] Support partition filter push down when there exists default partition for Hive source.
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c54eaf51aad662dcfd187f370792edf15bb80b3c
Author: luoyuxia <lu...@alibaba-inc.com>
AuthorDate: Wed Apr 13 15:11:36 2022 +0800
[FLINK-27217][hive] Support partition filter push down when there exists default partition for Hive source.
This closes #19450
---
.../flink/table/catalog/hive/HiveCatalog.java | 40 ++++++++++++++++++----
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 5 ++-
.../flink/connectors/hive/HiveDialectITCase.java | 21 ++++++++++++
.../connectors/hive/HiveTableSourceITCase.java | 16 +++++++++
.../table/api/internal/TableEnvironmentImpl.java | 6 +++-
.../table/operations/ShowPartitionsOperation.java | 17 +++++++++
6 files changed, 96 insertions(+), 9 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 093906c698c..c72ebbe8edc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -909,7 +909,14 @@ public class HiveCatalog extends AbstractCatalog {
.listPartitionNames(
tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1)
.stream()
- .map(HiveCatalog::createPartitionSpec)
+ .map(
+ p ->
+ createPartitionSpec(
+ p,
+ getHiveConf()
+ .getVar(
+ HiveConf.ConfVars
+ .DEFAULTPARTITIONNAME)))
.collect(Collectors.toList());
} catch (TException e) {
throw new CatalogException(
@@ -945,7 +952,14 @@ public class HiveCatalog extends AbstractCatalog {
partialVals,
(short) -1)
.stream()
- .map(HiveCatalog::createPartitionSpec)
+ .map(
+ p ->
+ createPartitionSpec(
+ p,
+ getHiveConf()
+ .getVar(
+ HiveConf.ConfVars
+ .DEFAULTPARTITIONNAME)))
.collect(Collectors.toList());
} catch (TException e) {
throw new CatalogException(
@@ -1139,21 +1153,28 @@ public class HiveCatalog extends AbstractCatalog {
/**
* Creates a {@link CatalogPartitionSpec} from a Hive partition name string. Example of Hive
- * partition name string - "name=bob/year=2019"
+ * partition name string - "name=bob/year=2019". If the partition name for the given partition
+ * column is equal to {@param defaultPartitionName}, the partition value in returned {@link
+ * CatalogPartitionSpec} will be null.
*/
- private static CatalogPartitionSpec createPartitionSpec(String hivePartitionName) {
+ private static CatalogPartitionSpec createPartitionSpec(
+ String hivePartitionName, String defaultPartitionName) {
String[] partKeyVals = hivePartitionName.split("/");
Map<String, String> spec = new HashMap<>(partKeyVals.length);
for (String keyVal : partKeyVals) {
String[] kv = keyVal.split("=");
- spec.put(unescapePathName(kv[0]), unescapePathName(kv[1]));
+ String partitionValue = unescapePathName(kv[1]);
+ spec.put(
+ unescapePathName(kv[0]),
+ partitionValue.equals(defaultPartitionName) ? null : partitionValue);
}
return new CatalogPartitionSpec(spec);
}
/**
* Get a list of ordered partition values by re-arranging them based on the given list of
- * partition keys.
+ * partition keys. If the partition value is null, it'll be converted into default partition
+ * name.
*
* @param partitionSpec a partition spec.
* @param partitionKeys a list of partition keys.
@@ -1177,7 +1198,11 @@ public class HiveCatalog extends AbstractCatalog {
throw new PartitionSpecInvalidException(
getName(), partitionKeys, tablePath, partitionSpec);
} else {
- values.add(spec.get(key));
+ String value = spec.get(key);
+ if (value == null) {
+ value = getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ }
+ values.add(value);
}
}
@@ -1803,6 +1828,7 @@ public class HiveCatalog extends AbstractCatalog {
listPartitions(
new ObjectPath(
hiveTable.getDbName(), hiveTable.getTableName()))) {
+
Partition partition = getHivePartition(hiveTable, spec);
HiveTableUtil.alterColumns(partition.getSd(), catalogTable);
client.alter_partition(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 9874da8faf0..6730d5cde76 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -1738,7 +1738,10 @@ public class HiveParserDDLSemanticAnalyzer {
if (partSpec != null && !partSpec.isEmpty()) {
spec = new CatalogPartitionSpec(new HashMap<>(partSpec));
}
- return new ShowPartitionsOperation(tableIdentifier, spec);
+ return new ShowPartitionsOperation(
+ tableIdentifier,
+ spec,
+ HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
}
private Operation convertShowDatabases() {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index e7f21e6de8b..f3975885f23 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -971,6 +971,27 @@ public class HiveDialectITCase {
.collect());
assertThat(partitions).hasSize(1);
assertThat(partitions.toString()).contains("dt=2020-04-30 01:02:03/country=china");
+ // show partitions for the table containing default partition
+ tableEnv.executeSql("create table tb1 (a string) partitioned by (c int)");
+ tableEnv.executeSql(
+ "INSERT OVERWRITE TABLE tb1 PARTITION (c) values ('Col1', null), ('Col1', 5)")
+ .await();
+ String defaultPartitionName =
+ hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+
+ partitions =
+ CollectionUtil.iteratorToList(tableEnv.executeSql("show partitions tb1").collect());
+ assertThat(partitions.toString())
+ .isEqualTo(String.format("[+I[c=5], +I[c=%s]]", defaultPartitionName));
+ partitions =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql(
+ String.format(
+ "show partitions tb1 partition (c='%s')",
+ defaultPartitionName))
+ .collect());
+ assertThat(partitions.toString())
+ .isEqualTo(String.format("[+I[c=%s]]", defaultPartitionName));
}
@Test
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 1b1e06834dc..dc98c4743f0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -218,6 +218,22 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
assertThat(rows).hasSize(2);
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertThat(rowStrings).isEqualTo(new String[] {"+I[2014, 3, 0]", "+I[2014, 4, 0]"});
+
+ // test the case that prune partition with reading partition from catalog without filter and
+ // there exists default partition
+ // insert null value for the partition column which will fall into the default partition
+ batchTableEnv
+ .executeSql(
+ "insert into source_db.test_table_pt_1 values ('2014', 1, null), ('2015', 2, null)")
+ .await();
+ // currently, the expression "is null" is supported HiveCatalog#listPartitionsByFilter,
+ // then the planer will list all partitions and then prue the partitions.
+ // the test is to cover such case
+ src =
+ batchTableEnv.sqlQuery(
+ "select * from hive.source_db.test_table_pt_1 where pt is null");
+ rows = CollectionUtil.iteratorToList(src.execute().collect());
+ assertThat(rows.toString()).isEqualTo("[+I[2014, 1, null], +I[2015, 2, null]]");
}
@Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index df95c584654..7f1af0fcb8c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1298,7 +1298,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size());
for (Map.Entry<String, String> partitionKV :
spec.getPartitionSpec().entrySet()) {
- partitionKVs.add(partitionKV.getKey() + "=" + partitionKV.getValue());
+ String partitionValue =
+ partitionKV.getValue() == null
+ ? showPartitionsOperation.getDefaultPartitionName()
+ : partitionKV.getValue();
+ partitionKVs.add(partitionKV.getKey() + "=" + partitionValue);
}
partitionNames.add(String.join("/", partitionKVs));
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
index c1e1c18d138..08a3ca7ea18 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
@@ -21,16 +21,29 @@ package org.apache.flink.table.operations;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import javax.annotation.Nullable;
+
/** Operation to describe a SHOW PARTITIONS statement. */
public class ShowPartitionsOperation implements ShowOperation {
protected final ObjectIdentifier tableIdentifier;
private final CatalogPartitionSpec partitionSpec;
+ // the name for the default partition, which usually means the partition's value is null or
+ // empty string
+ @Nullable private final String defaultPartitionName;
public ShowPartitionsOperation(
ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+ this(tableIdentifier, partitionSpec, null);
+ }
+
+ public ShowPartitionsOperation(
+ ObjectIdentifier tableIdentifier,
+ CatalogPartitionSpec partitionSpec,
+ @Nullable String defaultPartitionName) {
this.tableIdentifier = tableIdentifier;
this.partitionSpec = partitionSpec;
+ this.defaultPartitionName = defaultPartitionName;
}
public ObjectIdentifier getTableIdentifier() {
@@ -41,6 +54,10 @@ public class ShowPartitionsOperation implements ShowOperation {
return partitionSpec;
}
+ public String getDefaultPartitionName() {
+ return defaultPartitionName;
+ }
+
@Override
public String asSummaryString() {
StringBuilder builder =