You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/06 14:39:59 UTC
[flink] branch master updated: [FLINK-12366][table] Clean up
catalog APIs to make them more consistent and coherent
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 099106d [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
099106d is described below
commit 099106d8778ae39f9d99afa71a3590ae7ae916d7
Author: xuefuz <xu...@users.noreply.github.com>
AuthorDate: Mon May 6 07:39:44 2019 -0700
[FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
This closes #8312
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 8 +-
.../table/catalog/GenericInMemoryCatalog.java | 133 +++++++++++++--------
.../table/catalog/GenericInMemoryCatalogTest.java | 85 ++++++-------
.../flink/table/catalog/ReadableCatalog.java | 9 +-
.../table/catalog/ReadableWritableCatalog.java | 14 +--
.../exceptions/PartitionNotExistException.java | 3 +-
6 files changed, 140 insertions(+), 112 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 4c07938..a8fcf62 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -281,13 +281,13 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+ throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+ throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@@ -299,13 +299,13 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+ throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index b9568a5..dadd390 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -191,7 +191,7 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
} else {
tables.put(tablePath, table.copy());
- if ((table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned()) {
+ if (isPartitionedTable(tablePath)) {
partitions.put(tablePath, new LinkedHashMap<>());
}
}
@@ -376,24 +376,38 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+ checkNotNull(partition);
- validatePartitionSpec(tablePath, partitionSpec);
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ if (!isPartitionedTable(tablePath)) {
+ throw new TableNotPartitionedException(catalogName, tablePath);
+ }
if (partitionExists(tablePath, partitionSpec)) {
if (!ignoreIfExists) {
throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
}
- } else {
- partitions.get(tablePath).put(partitionSpec, partition.copy());
}
+
+ if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+ throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+ tablePath, partitionSpec);
+ }
+
+ partitions.get(tablePath).put(partitionSpec, partition.copy());
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
- validatePartitionSpec(tablePath, partitionSpec);
+ throws PartitionNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
if (partitionExists(tablePath, partitionSpec)) {
partitions.get(tablePath).remove(partitionSpec);
@@ -404,9 +418,10 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
- validatePartitionSpec(tablePath, partitionSpec);
+ throws PartitionNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+ checkNotNull(newPartition);
if (partitionExists(tablePath, partitionSpec)) {
partitions.get(tablePath).put(partitionSpec, newPartition.copy());
@@ -417,18 +432,37 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ checkNotNull(tablePath);
+
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
- validatePartitionedTable(tablePath);
+ if (!isPartitionedTable(tablePath)) {
+ throw new TableNotPartitionedException(catalogName, tablePath);
+ }
return new ArrayList<>(partitions.get(tablePath).keySet());
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
+
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
- validatePartitionSpec(tablePath, partitionSpec);
+ if (!isPartitionedTable(tablePath)) {
+ throw new TableNotPartitionedException(catalogName, tablePath);
+ }
+
+ if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+ return new ArrayList<>();
+ }
return partitions.get(tablePath).keySet().stream()
.filter(ps -> ps.getPartitionSpec().entrySet().containsAll(partitionSpec.getPartitionSpec().entrySet()))
@@ -437,75 +471,76 @@ public class GenericInMemoryCatalog implements ReadableWritableCatalog {
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
-
- CatalogTable table = validatePartitionSpec(tablePath, partitionSpec);
-
- if (partitionSpec.getPartitionSpec().size() < table.getPartitionKeys().size()) {
- throw new PartitionSpecInvalidException(catalogName, table.getPartitionKeys(), tablePath, partitionSpec);
- }
+ throws PartitionNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
- if (partitionExists(tablePath, partitionSpec)) {
- return partitions.get(tablePath).get(partitionSpec).copy();
- } else {
+ if (!partitionExists(tablePath, partitionSpec)) {
throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
}
+
+ return partitions.get(tablePath).get(partitionSpec).copy();
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws CatalogException {
+ throws CatalogException {
+ checkNotNull(tablePath);
+ checkNotNull(partitionSpec);
return partitions.containsKey(tablePath) && partitions.get(tablePath).containsKey(partitionSpec);
}
/**
- * Validate the partitioned table and partitionSpec.
+ * Check if the given partitionSpec is valid for the given table.
+ * Note that partition spec is considered invalid if the table doesn't exist or isn't partitioned.
*/
- private CatalogTable validatePartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException {
+ private boolean isPartitionSpecValid(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) {
+ CatalogBaseTable baseTable;
+ try {
+ baseTable = getTable(tablePath);
+ } catch (TableNotExistException e) {
+ return false;
+ }
- CatalogTable table = validatePartitionedTable(tablePath);
+ if (!(baseTable instanceof CatalogTable)) {
+ return false;
+ }
+ CatalogTable table = (CatalogTable) baseTable;
List<String> partitionKeys = table.getPartitionKeys();
Map<String, String> spec = partitionSpec.getPartitionSpec();
// The size of partition spec should not exceed the size of partition keys
if (partitionKeys.size() < spec.size()) {
- throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ return false;
} else {
int size = spec.size();
// PartitionSpec should contain the first 'size' number of keys in partition key list
for (int i = 0; i < size; i++) {
if (!spec.containsKey(partitionKeys.get(i))) {
- throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ return false;
}
}
}
- return table;
+ return true;
}
/**
- * Validate the partitioned table.
+ * Check if the given table is a partitioned table.
+ * Note that "false" is returned if the table doesn't exists.
*/
- private CatalogTable validatePartitionedTable(ObjectPath tablePath)
- throws TableNotExistException, TableNotPartitionedException {
-
- CatalogBaseTable baseTable = getTable(tablePath);
-
- if (!(baseTable instanceof CatalogTable)) {
- throw new CatalogException(
- String.format("%s in Catalog %s is not a CatalogTable", tablePath.getFullName(), catalogName));
- }
-
- CatalogTable table = (CatalogTable) baseTable;
-
- if (!table.isPartitioned()) {
- throw new TableNotPartitionedException(catalogName, tablePath);
+ private boolean isPartitionedTable(ObjectPath tablePath) {
+ CatalogBaseTable table = null;
+ try {
+ table = getTable(tablePath);
+ } catch (TableNotExistException e) {
+ return false;
}
- return table;
+ return (table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned();
}
+
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 8bb7c66..f88db0d 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -492,11 +492,7 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
CatalogTestUtil.checkEquals(anotherPartition, catalog.getPartition(path1, anotherPartitionSpec));
CatalogPartitionSpec invalid = createInvalidPartitionSpecSubset();
- exception.expect(PartitionSpecInvalidException.class);
- exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s",
- invalid, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
- catalog.listPartitions(path1, invalid);
+ assertTrue(catalog.listPartitions(path1, invalid).isEmpty());
}
@Test
@@ -574,37 +570,41 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
@Test
- public void testDropPartition_TableNotExistException() throws Exception {
+ public void testDropPartition_PartitionNotExistException_TableNotExist() throws Exception {
catalog.createDatabase(db1, createDb(), false);
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(TableNotExistException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, createPartitionSpec(), false);
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
}
@Test
- public void testDropPartition_TableNotPartitionedException() throws Exception {
+ public void testDropPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, createTable(), false);
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(TableNotPartitionedException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, createPartitionSpec(), false);
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
}
@Test
- public void testDropPartition_PartitionSpecInvalidException() throws Exception {
+ public void testDropPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogTable table = createPartitionedTable();
catalog.createTable(path1, table, false);
CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionSpecInvalidException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.dropPartition(path1, partitionSpec, false);
}
@@ -657,39 +657,41 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
@Test
- public void testAlterPartition_TableNotExistException() throws Exception {
+ public void testAlterPartition_PartitionNotExistException_TableNotExist() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(TableNotExistException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(), false);
}
@Test
- public void testAlterPartition_TableNotPartitionedException() throws Exception {
+ public void testAlterPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, createTable(), false);
CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(TableNotPartitionedException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(), false);
}
@Test
- public void testAlterPartition_PartitionSpecInvalidException() throws Exception {
+ public void testAlterPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogTable table = createPartitionedTable();
catalog.createTable(path1, table, false);
CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionSpecInvalidException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(), false);
}
@@ -715,20 +717,21 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
@Test
- public void testGetPartition_TableNotExistException() throws Exception {
- exception.expect(TableNotExistException.class);
+ public void testGetPartition_PartitionNotExistException_TableNotExist() throws Exception {
+ exception.expect(PartitionNotExistException.class);
catalog.getPartition(path1, createPartitionSpec());
}
@Test
- public void testGetPartition_TableNotPartitionedException() throws Exception {
+ public void testGetPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, createTable(), false);
-
- exception.expect(TableNotPartitionedException.class);
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
- catalog.getPartition(path1, createPartitionSpec());
+ String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+ path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
}
@Test
@@ -738,15 +741,15 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
catalog.createTable(path1, table, false);
CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionSpecInvalidException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.getPartition(path1, partitionSpec);
}
@Test
- public void testGetPartition_PartitionSpecInvalidException_sizeNotEqual() throws Exception {
+ public void testGetPartition_PartitionNotExistException_PartitionSpecInvalid_sizeNotEqual() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogTable table = createPartitionedTable();
catalog.createTable(path1, table, false);
@@ -756,10 +759,10 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
put("second", "bob");
}}
);
- exception.expect(PartitionSpecInvalidException.class);
+ exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.getPartition(path1, partitionSpec);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index 1b08736..a8a69f2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
@@ -160,11 +159,10 @@ public interface ReadableCatalog {
*
* @throws TableNotExistException thrown if the table does not exist in the catalog
* @throws TableNotPartitionedException thrown if the table is not partitioned
- * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
* @throws CatalogException in case of any runtime exception
*/
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
+ throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* Get a partition of the given table.
@@ -174,14 +172,11 @@ public interface ReadableCatalog {
* @param partitionSpec partition spec of partition to get
* @return the requested partition
*
- * @throws TableNotExistException thrown if the table does not exist in the catalog
- * @throws TableNotPartitionedException thrown if the table is not partitioned
- * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid,
* @throws PartitionNotExistException thrown if the partition is not partitioned
* @throws CatalogException in case of any runtime exception
*/
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+ throws PartitionNotExistException, CatalogException;
/**
* Check whether a partition exists or not.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 035b049..04d9bcb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -169,14 +169,11 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* if set to false, throw an exception,
* if set to true, nothing happens.
*
- * @throws TableNotExistException thrown if the target table does not exist
- * @throws TableNotPartitionedException thrown if the target table is not partitioned
- * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
* @throws PartitionNotExistException thrown if the target partition does not exist
* @throws CatalogException in case of any runtime exception
*/
void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+ throws PartitionNotExistException, CatalogException;
/**
* Alter a partition.
@@ -188,14 +185,11 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* if set to false, throw an exception,
* if set to true, nothing happens.
*
- * @throws TableNotExistException thrown if the target table does not exist
- * @throws TableNotPartitionedException thrown if the target table is not partitioned
- * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
* @throws PartitionNotExistException thrown if the target partition does not exist
* @throws CatalogException in case of any runtime exception
*/
void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+ throws PartitionNotExistException, CatalogException;
// ------ functions ------
@@ -238,6 +232,6 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* @throws FunctionNotExistException if the function does not exist
* @throws CatalogException in case of any runtime exception
*/
- void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException,
- CatalogException;
+ void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException;
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
index 597f85d..8d58bcc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
@@ -22,7 +22,8 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
/**
- * Exception for operation on a nonexistent partition.
+ * Exception for operation on a partition that doesn't exist. The cause includes non-existent table,
+ * non-partitioned table, invalid partition spec, etc.
*/
public class PartitionNotExistException extends Exception {
private static final String MSG = "Partition %s of table %s in catalog %s does not exist.";