You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/06 14:39:59 UTC

[flink] branch master updated: [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 099106d  [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
099106d is described below

commit 099106d8778ae39f9d99afa71a3590ae7ae916d7
Author: xuefuz <xu...@users.noreply.github.com>
AuthorDate: Mon May 6 07:39:44 2019 -0700

    [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
    
    This closes #8312
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |   8 +-
 .../table/catalog/GenericInMemoryCatalog.java      | 133 +++++++++++++--------
 .../table/catalog/GenericInMemoryCatalogTest.java  |  85 ++++++-------
 .../flink/table/catalog/ReadableCatalog.java       |   9 +-
 .../table/catalog/ReadableWritableCatalog.java     |  14 +--
 .../exceptions/PartitionNotExistException.java     |   3 +-
 6 files changed, 140 insertions(+), 112 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 4c07938..a8fcf62 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -281,13 +281,13 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
-			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+			throws PartitionNotExistException, CatalogException {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+			throws PartitionNotExistException, CatalogException {
 		throw new UnsupportedOperationException();
 	}
 
@@ -299,13 +299,13 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, CatalogException {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+			throws PartitionNotExistException, CatalogException {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index b9568a5..dadd390 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -191,7 +191,7 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 		} else {
 			tables.put(tablePath, table.copy());
 
-			if ((table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned()) {
+			if (isPartitionedTable(tablePath)) {
 				partitions.put(tablePath, new LinkedHashMap<>());
 			}
 		}
@@ -376,24 +376,38 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+		checkNotNull(partition);
 
-		validatePartitionSpec(tablePath, partitionSpec);
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+
+		if (!isPartitionedTable(tablePath)) {
+			throw new TableNotPartitionedException(catalogName, tablePath);
+		}
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			if (!ignoreIfExists) {
 				throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
 			}
-		} else {
-			partitions.get(tablePath).put(partitionSpec, partition.copy());
 		}
+
+		if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+			throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+				tablePath, partitionSpec);
+		}
+
+		partitions.get(tablePath).put(partitionSpec, partition.copy());
 	}
 
 	@Override
 	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
-		validatePartitionSpec(tablePath, partitionSpec);
+			throws PartitionNotExistException, CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			partitions.get(tablePath).remove(partitionSpec);
@@ -404,9 +418,10 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
-		validatePartitionSpec(tablePath, partitionSpec);
+			throws PartitionNotExistException, CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+		checkNotNull(newPartition);
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			partitions.get(tablePath).put(partitionSpec, newPartition.copy());
@@ -417,18 +432,37 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
-		throws TableNotExistException, TableNotPartitionedException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		checkNotNull(tablePath);
+
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
 
-		validatePartitionedTable(tablePath);
+		if (!isPartitionedTable(tablePath)) {
+			throw new TableNotPartitionedException(catalogName, tablePath);
+		}
 
 		return new ArrayList<>(partitions.get(tablePath).keySet());
 	}
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
+
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
 
-		validatePartitionSpec(tablePath, partitionSpec);
+		if (!isPartitionedTable(tablePath)) {
+			throw new TableNotPartitionedException(catalogName, tablePath);
+		}
+
+		if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+			return new ArrayList<>();
+		}
 
 		return partitions.get(tablePath).keySet().stream()
 			.filter(ps -> ps.getPartitionSpec().entrySet().containsAll(partitionSpec.getPartitionSpec().entrySet()))
@@ -437,75 +471,76 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 	@Override
 	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
-		CatalogTable table = validatePartitionSpec(tablePath, partitionSpec);
-
-		if (partitionSpec.getPartitionSpec().size() < table.getPartitionKeys().size()) {
-			throw new PartitionSpecInvalidException(catalogName, table.getPartitionKeys(), tablePath, partitionSpec);
-		}
+			throws PartitionNotExistException, CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
 
-		if (partitionExists(tablePath, partitionSpec)) {
-			return partitions.get(tablePath).get(partitionSpec).copy();
-		} else {
+		if (!partitionExists(tablePath, partitionSpec)) {
 			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
 		}
+
+		return partitions.get(tablePath).get(partitionSpec).copy();
 	}
 
 	@Override
 	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws CatalogException {
+			throws CatalogException {
+		checkNotNull(tablePath);
+		checkNotNull(partitionSpec);
 
 		return partitions.containsKey(tablePath) && partitions.get(tablePath).containsKey(partitionSpec);
 	}
 
 	/**
-	 * Validate the partitioned table and partitionSpec.
+	 * Check if the given partitionSpec is valid for the given table.
+	 * Note that partition spec is considered invalid if the table doesn't exist or isn't partitioned.
 	 */
-	private CatalogTable validatePartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException {
+	private boolean isPartitionSpecValid(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) {
+		CatalogBaseTable baseTable;
+		try {
+			baseTable = getTable(tablePath);
+		} catch (TableNotExistException e) {
+			return false;
+		}
 
-		CatalogTable table = validatePartitionedTable(tablePath);
+		if (!(baseTable instanceof CatalogTable)) {
+			return false;
+		}
 
+		CatalogTable table =  (CatalogTable) baseTable;
 		List<String> partitionKeys = table.getPartitionKeys();
 		Map<String, String> spec = partitionSpec.getPartitionSpec();
 
 		// The size of partition spec should not exceed the size of partition keys
 		if (partitionKeys.size() < spec.size()) {
-			throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+			return false;
 		} else {
 			int size = spec.size();
 
 			// PartitionSpec should contain the first 'size' number of keys in partition key list
 			for (int i = 0; i < size; i++) {
 				if (!spec.containsKey(partitionKeys.get(i))) {
-					throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+					return false;
 				}
 			}
 		}
 
-		return table;
+		return true;
 	}
 
 	/**
-	 * Validate the partitioned table.
+	 * Check if the given table is a partitioned table.
+	 * Note that "false" is returned if the table doesn't exists.
 	 */
-	private CatalogTable validatePartitionedTable(ObjectPath tablePath)
-		throws TableNotExistException, TableNotPartitionedException {
-
-		CatalogBaseTable baseTable = getTable(tablePath);
-
-		if (!(baseTable instanceof CatalogTable)) {
-			throw new CatalogException(
-				String.format("%s in Catalog %s is not a CatalogTable", tablePath.getFullName(), catalogName));
-		}
-
-		CatalogTable table = (CatalogTable) baseTable;
-
-		if (!table.isPartitioned()) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
+	private boolean isPartitionedTable(ObjectPath tablePath) {
+		CatalogBaseTable table = null;
+		try {
+			table = getTable(tablePath);
+		} catch (TableNotExistException e) {
+			return false;
 		}
 
-		return table;
+		return (table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned();
 	}
+
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 8bb7c66..f88db0d 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -492,11 +492,7 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		CatalogTestUtil.checkEquals(anotherPartition, catalog.getPartition(path1, anotherPartitionSpec));
 
 		CatalogPartitionSpec invalid = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionSpecInvalidException.class);
-		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s",
-				invalid, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.listPartitions(path1, invalid);
+		assertTrue(catalog.listPartitions(path1, invalid).isEmpty());
 	}
 
 	@Test
@@ -574,37 +570,41 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testDropPartition_TableNotExistException() throws Exception {
+	public void testDropPartition_PartitionNotExistException_TableNotExist() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
 
-		exception.expect(TableNotExistException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, createPartitionSpec(), false);
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
 	}
 
 	@Test
-	public void testDropPartition_TableNotPartitionedException() throws Exception {
+	public void testDropPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		catalog.createTable(path1, createTable(), false);
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
 
-		exception.expect(TableNotPartitionedException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, createPartitionSpec(), false);
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
 	}
 
 	@Test
-	public void testDropPartition_PartitionSpecInvalidException() throws Exception {
+	public void testDropPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
 		catalog.createTable(path1, table, false);
 
 		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionSpecInvalidException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
-				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.dropPartition(path1, partitionSpec, false);
 	}
 
@@ -657,39 +657,41 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testAlterPartition_TableNotExistException() throws Exception {
+	public void testAlterPartition_PartitionNotExistException_TableNotExist() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 
 		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(TableNotExistException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
 	}
 
 	@Test
-	public void testAlterPartition_TableNotPartitionedException() throws Exception {
+	public void testAlterPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		catalog.createTable(path1, createTable(), false);
 
 		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(TableNotPartitionedException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
 	}
 
 	@Test
-	public void testAlterPartition_PartitionSpecInvalidException() throws Exception {
+	public void testAlterPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
 		catalog.createTable(path1, table, false);
 
 		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionSpecInvalidException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
-				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
 	}
 
@@ -715,20 +717,21 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testGetPartition_TableNotExistException() throws Exception {
-		exception.expect(TableNotExistException.class);
+	public void testGetPartition_PartitionNotExistException_TableNotExist() throws Exception {
+		exception.expect(PartitionNotExistException.class);
 		catalog.getPartition(path1, createPartitionSpec());
 	}
 
 	@Test
-	public void testGetPartition_TableNotPartitionedException() throws Exception {
+	public void testGetPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		catalog.createTable(path1, createTable(), false);
-
-		exception.expect(TableNotPartitionedException.class);
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.getPartition(path1, createPartitionSpec());
+			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+				path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
 	}
 
 	@Test
@@ -738,15 +741,15 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		catalog.createTable(path1, table, false);
 
 		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionSpecInvalidException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
-				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.getPartition(path1, partitionSpec);
 	}
 
 	@Test
-	public void testGetPartition_PartitionSpecInvalidException_sizeNotEqual() throws Exception {
+	public void testGetPartition_PartitionNotExistException_PartitionSpecInvalid_sizeNotEqual() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
 		catalog.createTable(path1, table, false);
@@ -756,10 +759,10 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 				put("second", "bob");
 			}}
 		);
-		exception.expect(PartitionSpecInvalidException.class);
+		exception.expect(PartitionNotExistException.class);
 		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
-				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
 		catalog.getPartition(path1, partitionSpec);
 	}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index 1b08736..a8a69f2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
@@ -160,11 +159,10 @@ public interface ReadableCatalog {
 	 *
 	 * @throws TableNotExistException thrown if the table does not exist in the catalog
 	 * @throws TableNotPartitionedException thrown if the table is not partitioned
-	 * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
 	 * @throws CatalogException in case of any runtime exception
 	 */
 	List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
+		throws TableNotExistException, TableNotPartitionedException, CatalogException;
 
 	/**
 	 * Get a partition of the given table.
@@ -174,14 +172,11 @@ public interface ReadableCatalog {
 	 * @param partitionSpec partition spec of partition to get
 	 * @return the requested partition
 	 *
-	 * @throws TableNotExistException thrown if the table does not exist in the catalog
-	 * @throws TableNotPartitionedException thrown if the table is not partitioned
-	 * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid,
 	 * @throws PartitionNotExistException thrown if the partition is not partitioned
 	 * @throws CatalogException	in case of any runtime exception
 	 */
 	CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+		throws PartitionNotExistException, CatalogException;
 
 	/**
 	 * Check whether a partition exists or not.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 035b049..04d9bcb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -169,14 +169,11 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 *                          if set to false, throw an exception,
 	 *                          if set to true, nothing happens.
 	 *
-	 * @throws TableNotExistException thrown if the target table does not exist
-	 * @throws TableNotPartitionedException thrown if the target table is not partitioned
-	 * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
 	 * @throws PartitionNotExistException thrown if the target partition does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
 	void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+		throws PartitionNotExistException, CatalogException;
 
 	/**
 	 * Alter a partition.
@@ -188,14 +185,11 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 *                          if set to false, throw an exception,
 	 *                          if set to true, nothing happens.
 	 *
-	 * @throws TableNotExistException thrown if the target table does not exist
-	 * @throws TableNotPartitionedException thrown if the target table is not partitioned
-	 * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
 	 * @throws PartitionNotExistException thrown if the target partition does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
 	void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-		throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+		throws PartitionNotExistException, CatalogException;
 
 	// ------ functions ------
 
@@ -238,6 +232,6 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 * @throws FunctionNotExistException if the function does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException,
-		CatalogException;
+	void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+		throws FunctionNotExistException, CatalogException;
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
index 597f85d..8d58bcc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
@@ -22,7 +22,8 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 
 /**
- * Exception for operation on a nonexistent partition.
+ * Exception for operation on a partition that doesn't exist. The cause includes non-existent table,
+ * non-partitioned table, invalid partition spec, etc.
  */
 public class PartitionNotExistException extends Exception {
 	private static final String MSG = "Partition %s of table %s in catalog %s does not exist.";