You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/24 03:38:07 UTC

[flink] branch release-1.11 updated: [FLINK-18900][hive] HiveCatalog should error out when listing partitions with an invalid spec

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new ed09bd6  [FLINK-18900][hive] HiveCatalog should error out when listing partitions with an invalid spec
ed09bd6 is described below

commit ed09bd65b64ef64d4a0b6966a7227bbdf8231c29
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Aug 24 11:36:07 2020 +0800

    [FLINK-18900][hive] HiveCatalog should error out when listing partitions with an invalid spec
    
    This closes #13221
---
 .../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++++++++++-
 .../connector/jdbc/catalog/AbstractJdbcCatalog.java  |  2 +-
 .../flink/table/catalog/GenericInMemoryCatalog.java  |  2 +-
 .../java/org/apache/flink/table/catalog/Catalog.java |  2 +-
 4 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index a994c5f..9a4eb3d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -765,13 +765,14 @@ public class HiveCatalog extends AbstractCatalog {
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
 		checkNotNull(tablePath, "Table path cannot be null");
 		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
 
 		Table hiveTable = getHiveTable(tablePath);
 
 		ensurePartitionedTable(tablePath, hiveTable);
+		checkValidPartitionSpec(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath);
 
 		try {
 			// partition spec can be partial
@@ -996,6 +997,23 @@ public class HiveCatalog extends AbstractCatalog {
 		return values;
 	}
 
+	/**
+	 * Check whether a list of partition values are valid based on the given list of partition keys.
+	 *
+	 * @param partitionSpec a partition spec.
+	 * @param partitionKeys a list of partition keys.
+	 * @param tablePath path of the table to which the partition belongs.
+	 * @throws PartitionSpecInvalidException thrown if any key in partitionSpec doesn't exist in partitionKeys.
+	 */
+	private void checkValidPartitionSpec(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+		throws PartitionSpecInvalidException {
+		for (String key : partitionSpec.getPartitionSpec().keySet()) {
+			if (!partitionKeys.contains(key)) {
+				throw new PartitionSpecInvalidException(getName(), partitionKeys, tablePath, partitionSpec);
+			}
+		}
+	}
+
 	private Partition getHivePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
 			throws TableNotExistException, PartitionSpecInvalidException, TException {
 		return getHivePartition(getHiveTable(tablePath), partitionSpec);
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 4b1ebfb..8230454 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -215,7 +215,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 	}
 
 	@Override
-	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
 		return Collections.emptyList();
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index a4893b5..f515d62 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -533,7 +533,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
 		checkNotNull(tablePath);
 		checkNotNull(partitionSpec);
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 1e4c482..8bda1ff 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -327,7 +327,7 @@ public interface Catalog {
 	 * @throws CatalogException in case of any runtime exception
 	 */
 	List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, CatalogException;
+		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
 
 	/**
 	 * Get CatalogPartitionSpec of partitions by expression filters in the table.