You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/24 03:38:07 UTC
[flink] branch release-1.11 updated: [FLINK-18900][hive]
HiveCatalog should error out when listing partitions with an invalid spec
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new ed09bd6 [FLINK-18900][hive] HiveCatalog should error out when listing partitions with an invalid spec
ed09bd6 is described below
commit ed09bd65b64ef64d4a0b6966a7227bbdf8231c29
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Aug 24 11:36:07 2020 +0800
[FLINK-18900][hive] HiveCatalog should error out when listing partitions with an invalid spec
This closes #13221
---
.../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++++++++++-
.../connector/jdbc/catalog/AbstractJdbcCatalog.java | 2 +-
.../flink/table/catalog/GenericInMemoryCatalog.java | 2 +-
.../java/org/apache/flink/table/catalog/Catalog.java | 2 +-
4 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index a994c5f..9a4eb3d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -765,13 +765,14 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
Table hiveTable = getHiveTable(tablePath);
ensurePartitionedTable(tablePath, hiveTable);
+ checkValidPartitionSpec(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath);
try {
// partition spec can be partial
@@ -996,6 +997,23 @@ public class HiveCatalog extends AbstractCatalog {
return values;
}
+ /**
+ * Check whether a list of partition values are valid based on the given list of partition keys.
+ *
+ * @param partitionSpec a partition spec.
+ * @param partitionKeys a list of partition keys.
+ * @param tablePath path of the table to which the partition belongs.
+ * @throws PartitionSpecInvalidException thrown if any key in partitionSpec doesn't exist in partitionKeys.
+ */
+ private void checkValidPartitionSpec(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+ throws PartitionSpecInvalidException {
+ for (String key : partitionSpec.getPartitionSpec().keySet()) {
+ if (!partitionKeys.contains(key)) {
+ throw new PartitionSpecInvalidException(getName(), partitionKeys, tablePath, partitionSpec);
+ }
+ }
+ }
+
private Partition getHivePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, PartitionSpecInvalidException, TException {
return getHivePartition(getHiveTable(tablePath), partitionSpec);
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 4b1ebfb..8230454 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -215,7 +215,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
}
@Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
return Collections.emptyList();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index a4893b5..f515d62 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -533,7 +533,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
checkNotNull(tablePath);
checkNotNull(partitionSpec);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 1e4c482..8bda1ff 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -327,7 +327,7 @@ public interface Catalog {
* @throws CatalogException in case of any runtime exception
*/
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, CatalogException;
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
/**
* Get CatalogPartitionSpec of partitions by expression filters in the table.