You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/24 05:03:52 UTC
[flink] branch master updated: [FLINK-12235][hive] Support
partition related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 95e1686 [FLINK-12235][hive] Support partition related operations in HiveCatalog
95e1686 is described below
commit 95e16862822922972f70a710bc09b7ec31c9043b
Author: Rui Li <li...@apache.org>
AuthorDate: Wed May 15 19:44:06 2019 +0800
[FLINK-12235][hive] Support partition related operations in HiveCatalog
This PR adds support for Hive partitions in HiveCatalog.
This closes #8449.
---
.../flink/table/catalog/hive/HiveCatalog.java | 270 ++++++++++++++-
.../table/catalog/hive/HiveCatalogPartition.java | 61 ++++
.../hive/HiveCatalogGenericMetadataTest.java | 108 ++++++
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 18 +
.../table/catalog/GenericInMemoryCatalog.java | 82 ++---
.../table/catalog/GenericInMemoryCatalogTest.java | 371 +-------------------
.../org/apache/flink/table/catalog/Catalog.java | 2 +-
.../flink/table/catalog/CatalogTestBase.java | 379 +++++++++++++++++++++
.../flink/table/catalog/CatalogTestUtil.java | 4 -
9 files changed, 859 insertions(+), 436 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1f0bddc..d2387f0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
import org.apache.flink.table.catalog.GenericCatalogFunction;
+import org.apache.flink.table.catalog.GenericCatalogPartition;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -500,7 +503,7 @@ public class HiveCatalog implements Catalog {
// Partition keys
List<String> partitionKeys = new ArrayList<>();
if (!hiveTable.getPartitionKeys().isEmpty()) {
- partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList());
+ partitionKeys = getFieldNames(hiveTable.getPartitionKeys());
}
if (isView) {
@@ -608,44 +611,285 @@ public class HiveCatalog implements Catalog {
// ------ partitions ------
@Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+ try {
+ return getHivePartition(tablePath, partitionSpec) != null;
+ } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
+ }
+ }
+
+ @Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+ checkNotNull(partition, "Partition cannot be null");
+
+ if (!(partition instanceof HiveCatalogPartition)) {
+ throw new CatalogException("Currently only supports HiveCatalogPartition");
+ }
+
+ Table hiveTable = getHiveTable(tablePath);
+
+ ensureTableAndPartitionMatch(hiveTable, partition);
+
+ ensurePartitionedTable(tablePath, hiveTable);
+
+ try {
+ client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to create partition %s of table %s", partitionSpec, tablePath));
+ }
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
- @Override
- public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ Table hiveTable = getHiveTable(tablePath);
+ client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(),
+ getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ }
+ } catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath));
+ }
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ Table hiveTable = getHiveTable(tablePath);
+
+ ensurePartitionedTable(tablePath, hiveTable);
+
+ try {
+ // pass -1 as max_parts to fetch all partitions
+ return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1).stream()
+ .map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list partitions of table %s", tablePath), e);
+ }
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+ Table hiveTable = getHiveTable(tablePath);
+
+ ensurePartitionedTable(tablePath, hiveTable);
+
+ try {
+ // partition spec can be partial
+ List<String> partialVals = MetaStoreUtils.getPvals(hiveTable.getPartitionKeys(), partitionSpec.getPartitionSpec());
+ return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), partialVals,
+ (short) -1).stream().map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list partitions of table %s", tablePath), e);
+ }
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+ try {
+ Partition hivePartition = getHivePartition(tablePath, partitionSpec);
+ return instantiateCatalogPartition(hivePartition);
+ } catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
+ }
}
@Override
- public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
- throw new UnsupportedOperationException();
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+ checkNotNull(newPartition, "New partition cannot be null");
+
+ if (!(newPartition instanceof HiveCatalogPartition)) {
+ throw new CatalogException("Currently only supports HiveCatalogPartition");
+ }
+
+ // Explicitly check if the partition exists or not
+ // because alter_partition() doesn't throw NoSuchObjectException like dropPartition() when the target doesn't exist
+ try {
+ Table hiveTable = getHiveTable(tablePath);
+ ensureTableAndPartitionMatch(hiveTable, newPartition);
+ Partition oldHivePartition = getHivePartition(hiveTable, partitionSpec);
+ if (oldHivePartition == null) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ }
+ Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
+ if (newHivePartition.getSd().getLocation() == null) {
+ newHivePartition.getSd().setLocation(oldHivePartition.getSd().getLocation());
+ }
+ client.alter_partition(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ newHivePartition
+ );
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ }
+ } catch (InvalidOperationException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to alter existing partition with new partition %s of table %s",
+ partitionSpec, tablePath), e);
+ }
+ }
+
+ // make sure both table and partition are generic, or neither is
+ private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
+ boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+ if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
+ (!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
+ throw new CatalogException(String.format("Cannot handle %s partition for %s table",
+ catalogPartition.getClass().getName(), isGeneric ? "generic" : "non-generic"));
+ }
+ }
+
+ private Partition instantiateHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+ throws PartitionSpecInvalidException {
+ Partition partition = new Partition();
+ List<String> partCols = getFieldNames(hiveTable.getPartitionKeys());
+ List<String> partValues = getOrderedFullPartitionValues(
+ partitionSpec, partCols, new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()));
+ // validate partition values
+ for (int i = 0; i < partCols.size(); i++) {
+ if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
+ throw new PartitionSpecInvalidException(catalogName, partCols,
+ new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec);
+ }
+ }
+ // TODO: handle GenericCatalogPartition
+ HiveCatalogPartition hiveCatalogPartition = (HiveCatalogPartition) catalogPartition;
+ partition.setValues(partValues);
+ partition.setDbName(hiveTable.getDbName());
+ partition.setTableName(hiveTable.getTableName());
+ partition.setCreateTime((int) (System.currentTimeMillis() / 1000));
+ partition.setParameters(hiveCatalogPartition.getProperties());
+ partition.setSd(hiveTable.getSd().deepCopy());
+ partition.getSd().setLocation(hiveCatalogPartition.getLocation());
+
+ return partition;
+ }
+
+ private static CatalogPartition instantiateCatalogPartition(Partition hivePartition) {
+ // TODO: create GenericCatalogPartition for GenericCatalogTable
+ return new HiveCatalogPartition(hivePartition.getParameters(), hivePartition.getSd().getLocation());
+ }
+
+ private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
+ if (hiveTable.getPartitionKeysSize() == 0) {
+ throw new TableNotPartitionedException(catalogName, tablePath);
+ }
+ }
+
+ /**
+ * Get field names from field schemas.
+ */
+ private static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ for (FieldSchema fs : fieldSchemas) {
+ names.add(fs.getName());
+ }
+ return names;
+ }
+
+ /**
+ * Creates a {@link CatalogPartitionSpec} from a Hive partition name string.
+ * Example of Hive partition name string - "name=bob/year=2019"
+ */
+ private static CatalogPartitionSpec createPartitionSpec(String hivePartitionName) {
+ String[] partKeyVals = hivePartitionName.split("/");
+ Map<String, String> spec = new HashMap<>(partKeyVals.length);
+ for (String keyVal : partKeyVals) {
+ String[] kv = keyVal.split("=");
+ spec.put(kv[0], kv[1]);
+ }
+ return new CatalogPartitionSpec(spec);
+ }
+
+ /**
+ * Get a list of ordered partition values by re-arranging them based on the given list of partition keys.
+ *
+ * @param partitionSpec a partition spec.
+ * @param partitionKeys a list of partition keys.
+ * @param tablePath path of the table to which the partition belongs.
+ * @return A list of partition values ordered according to partitionKeys.
+ * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have different sizes,
+ * or any key in partitionKeys doesn't exist in partitionSpec.
+ */
+ private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+ throws PartitionSpecInvalidException {
+ Map<String, String> spec = partitionSpec.getPartitionSpec();
+ if (spec.size() != partitionKeys.size()) {
+ throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ }
+
+ List<String> values = new ArrayList<>(spec.size());
+ for (String key : partitionKeys) {
+ if (!spec.containsKey(key)) {
+ throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ } else {
+ values.add(spec.get(key));
+ }
+ }
+
+ return values;
+ }
+
+ private Partition getHivePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, PartitionSpecInvalidException, TException {
+ return getHivePartition(getHiveTable(tablePath), partitionSpec);
+ }
+
+ private Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
+ throws PartitionSpecInvalidException, TException {
+ return client.getPartition(hiveTable.getDbName(), hiveTable.getTableName(),
+ getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()),
+ new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName())));
}
// ------ functions ------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
new file mode 100644
index 0000000..98b13a2
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.AbstractCatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A CatalogPartition implementation that represents a Partition in Hive.
+ */
+public class HiveCatalogPartition extends AbstractCatalogPartition {
+ private final String location;
+
+ public HiveCatalogPartition(Map<String, String> properties, String location) {
+ super(properties, null);
+ this.location = location;
+ }
+
+ public HiveCatalogPartition(Map<String, String> properties) {
+ this(properties, null);
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ @Override
+ public CatalogPartition copy() {
+ return new HiveCatalogPartition(new HashMap<>(getProperties()), location);
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.empty();
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 9a35068..90ec11e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
@@ -92,6 +93,108 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
+ // ------ partitions ------
+
+ @Test
+ public void testCreatePartition() throws Exception {
+ }
+
+ @Test
+ public void testCreatePartition_TableNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testCreatePartition_TableNotPartitionedException() throws Exception {
+ }
+
+ @Test
+ public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition_TableNotExist() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition_TableNotPartitioned() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition_PartitionSpecInvalid() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExist() throws Exception {
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExist_ignored() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition_TableNotExist() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition_TableNotPartitioned() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExist() throws Exception {
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
+ }
+
+ @Test
+ public void testGetPartition_TableNotExist() throws Exception {
+ }
+
+ @Test
+ public void testGetPartition_TableNotPartitioned() throws Exception {
+ }
+
+ @Test
+ public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+ }
+
+ @Test
+ public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws Exception {
+ }
+
+ @Test
+ public void testGetPartition_PartitionNotExist() throws Exception {
+ }
+
+ @Test
+ public void testPartitionExists() throws Exception {
+ }
+
+ @Test
+ public void testListPartitionPartialSpec() throws Exception {
+ }
+
// ------ test utils ------
@Override
@@ -183,4 +286,9 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
protected CatalogFunction createAnotherFunction() {
return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
}
+
+ @Override
+ public CatalogPartition createPartition() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 1126564..e2d95a3 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
@@ -144,6 +145,11 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
}
@Override
+ public CatalogPartition createPartition() {
+ return new HiveCatalogPartition(getBatchTableProperties());
+ }
+
+ @Override
public void checkEquals(CatalogTable t1, CatalogTable t2) {
assertEquals(t1.getSchema(), t2.getSchema());
assertEquals(t1.getComment(), t2.getComment());
@@ -155,6 +161,7 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
}
+ @Override
protected void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getSchema(), v1.getSchema());
assertEquals(v1.getComment(), v2.getComment());
@@ -165,4 +172,15 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
// thus properties of Hive view is a super set of those in its corresponding Flink view
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
}
+
+ @Override
+ protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
+ assertTrue(expected instanceof HiveCatalogPartition && actual instanceof HiveCatalogPartition);
+ assertEquals(expected.getClass(), actual.getClass());
+ HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) expected;
+ HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) actual;
+ assertEquals(hivePartition1.getDescription(), hivePartition2.getDescription());
+ assertEquals(hivePartition1.getDetailedDescription(), hivePartition2.getDetailedDescription());
+ assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index c90e1722..d46a423 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -344,6 +344,12 @@ public class GenericInMemoryCatalog implements Catalog {
return databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
}
+ private void ensureTableExists(ObjectPath tablePath) throws TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ }
+
// ------ functions ------
@Override
@@ -437,13 +443,9 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(partitionSpec);
checkNotNull(partition);
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- if (!isPartitionedTable(tablePath)) {
- throw new TableNotPartitionedException(catalogName, tablePath);
- }
+ ensureTableExists(tablePath);
+ ensurePartitionedTable(tablePath);
+ ensureFullPartitionSpec(tablePath, partitionSpec);
if (partitionExists(tablePath, partitionSpec)) {
if (!ignoreIfExists) {
@@ -451,11 +453,6 @@ public class GenericInMemoryCatalog implements Catalog {
}
}
- if (!isPartitionSpecValid(tablePath, partitionSpec)) {
- throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
- tablePath, partitionSpec);
- }
-
partitions.get(tablePath).put(partitionSpec, partition.copy());
}
@@ -502,13 +499,8 @@ public class GenericInMemoryCatalog implements Catalog {
throws TableNotExistException, TableNotPartitionedException, CatalogException {
checkNotNull(tablePath);
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- if (!isPartitionedTable(tablePath)) {
- throw new TableNotPartitionedException(catalogName, tablePath);
- }
+ ensureTableExists(tablePath);
+ ensurePartitionedTable(tablePath);
return new ArrayList<>(partitions.get(tablePath).keySet());
}
@@ -519,15 +511,12 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(tablePath);
checkNotNull(partitionSpec);
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- if (!isPartitionedTable(tablePath)) {
- throw new TableNotPartitionedException(catalogName, tablePath);
- }
+ ensurePartitionedTable(tablePath);
- if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+ CatalogTable catalogTable = (CatalogTable) getTable(tablePath);
+ List<String> partKeys = catalogTable.getPartitionKeys();
+ Map<String, String> spec = partitionSpec.getPartitionSpec();
+ if (!partKeys.containsAll(spec.keySet())) {
return new ArrayList<>();
}
@@ -558,41 +547,36 @@ public class GenericInMemoryCatalog implements Catalog {
return partitions.containsKey(tablePath) && partitions.get(tablePath).containsKey(partitionSpec);
}
+ private void ensureFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, PartitionSpecInvalidException {
+ if (!isFullPartitionSpec(tablePath, partitionSpec)) {
+ throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+ tablePath, partitionSpec);
+ }
+ }
+
/**
- * Check if the given partitionSpec is valid for the given table.
- * Note that partition spec is considered invalid if the table doesn't exist or isn't partitioned.
+ * Check if the given partitionSpec is full partition spec for the given table.
*/
- private boolean isPartitionSpecValid(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) {
- CatalogBaseTable baseTable;
- try {
- baseTable = getTable(tablePath);
- } catch (TableNotExistException e) {
- return false;
- }
+ private boolean isFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException {
+ CatalogBaseTable baseTable = getTable(tablePath);
if (!(baseTable instanceof CatalogTable)) {
return false;
}
- CatalogTable table = (CatalogTable) baseTable;
+ CatalogTable table = (CatalogTable) baseTable;
List<String> partitionKeys = table.getPartitionKeys();
Map<String, String> spec = partitionSpec.getPartitionSpec();
// The size of partition spec should not exceed the size of partition keys
- if (partitionKeys.size() < spec.size()) {
- return false;
- } else {
- int size = spec.size();
+ return partitionKeys.size() == spec.size() && spec.keySet().containsAll(partitionKeys);
+ }
- // PartitionSpec should contain the first 'size' number of keys in partition key list
- for (int i = 0; i < size; i++) {
- if (!spec.containsKey(partitionKeys.get(i))) {
- return false;
- }
- }
+ private void ensurePartitionedTable(ObjectPath tablePath) throws TableNotPartitionedException {
+ if (!isPartitionedTable(tablePath)) {
+ throw new TableNotPartitionedException(catalogName, tablePath);
}
-
- return true;
}
/**
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d332456..89eb043 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -19,11 +19,6 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -39,13 +34,10 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -100,194 +92,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
// ------ partitions ------
@Test
- public void testCreatePartition() throws Exception {
- CatalogTable table = createPartitionedTable();
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, table, false);
-
- assertTrue(catalog.listPartitions(path1).isEmpty());
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- catalog.createPartition(path1, partitionSpec, createPartition(), false);
-
- assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1));
- assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1, createPartitionSpecSubset()));
- CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
-
- CatalogPartitionSpec anotherPartitionSpec = createAnotherPartitionSpec();
- CatalogPartition anotherPartition = createAnotherPartition();
- catalog.createPartition(path1, anotherPartitionSpec, anotherPartition, false);
-
- assertEquals(Arrays.asList(partitionSpec, anotherPartitionSpec), catalog.listPartitions(path1));
- assertEquals(Arrays.asList(partitionSpec, anotherPartitionSpec), catalog.listPartitions(path1, createPartitionSpecSubset()));
- CatalogTestUtil.checkEquals(anotherPartition, catalog.getPartition(path1, anotherPartitionSpec));
-
- CatalogPartitionSpec invalid = createInvalidPartitionSpecSubset();
- assertTrue(catalog.listPartitions(path1, invalid).isEmpty());
- }
-
- @Test
- public void testCreatePartition_TableNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- exception.expect(TableNotExistException.class);
- exception.expectMessage(
- String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
- catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
- }
-
- @Test
- public void testCreatePartition_TableNotPartitionedException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
-
- exception.expect(TableNotPartitionedException.class);
- exception.expectMessage(
- String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
- catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
- }
-
- @Test
- public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionSpecInvalidException.class);
- exception.expectMessage(
- String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
- catalog.createPartition(path1, partitionSpec, createPartition(), false);
- }
-
- @Test
- public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
- CatalogPartition partition = createPartition();
- catalog.createPartition(path1, createPartitionSpec(), partition, false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
- exception.expect(PartitionAlreadyExistsException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s already exists.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.createPartition(path1, partitionSpec, createPartition(), false);
- }
-
- @Test
- public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- catalog.createPartition(path1, partitionSpec, createPartition(), false);
- catalog.createPartition(path1, partitionSpec, createPartition(), true);
- }
-
- @Test
- public void testDropPartition() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
- catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-
- assertEquals(Arrays.asList(createPartitionSpec()), catalog.listPartitions(path1));
-
- catalog.dropPartition(path1, createPartitionSpec(), false);
-
- assertEquals(Arrays.asList(), catalog.listPartitions(path1));
- }
-
- @Test
- public void testDropPartition_PartitionNotExistException_TableNotExist() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, partitionSpec, false);
- }
-
- @Test
- public void testDropPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, partitionSpec, false);
- }
-
- @Test
- public void testDropPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, partitionSpec, false);
- }
-
- @Test
- public void testDropPartition_PartitionNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.dropPartition(path1, partitionSpec, false);
- }
-
- @Test
- public void testDropPartition_PartitionNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
- catalog.dropPartition(path1, createPartitionSpec(), true);
- }
-
- @Test
- public void testAlterPartition() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- catalog.createPartition(path1, partitionSpec, createPartition(), false);
-
- assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1));
-
- CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
- CatalogTestUtil.checkEquals(createPartition(), cp);
-
- assertNull(cp.getProperties().get("k"));
-
- Map<String, String> partitionProperties = getBatchTableProperties();
- partitionProperties.put("k", "v");
-
- CatalogPartition another = createPartition(partitionProperties);
- catalog.alterPartition(path1, createPartitionSpec(), another, false);
-
- assertEquals(Arrays.asList(createPartitionSpec()), catalog.listPartitions(path1));
-
- cp = catalog.getPartition(path1, createPartitionSpec());
- CatalogTestUtil.checkEquals(another, cp);
-
- assertEquals("v", cp.getProperties().get("k"));
- }
-
- @Test
public void testAlterPartition_differentTypedPartition() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, createPartitionedTable(), false);
@@ -305,140 +109,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
catalog.alterPartition(path1, partitionSpec, new TestPartition(), false);
}
- @Test
- public void testAlterPartition_PartitionNotExistException_TableNotExist() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.alterPartition(path1, partitionSpec, createPartition(), false);
- }
-
- @Test
- public void testAlterPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.alterPartition(path1, partitionSpec, createPartition(), false);
- }
-
- @Test
- public void testAlterPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.alterPartition(path1, partitionSpec, createPartition(), false);
- }
-
- @Test
- public void testAlterPartition_PartitionNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartition catalogPartition = createPartition();
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.alterPartition(path1, partitionSpec, catalogPartition, false);
- }
-
- @Test
- public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
- catalog.alterPartition(path1, createPartitionSpec(), createPartition(), true);
- }
-
- @Test
- public void testGetPartition_PartitionNotExistException_TableNotExist() throws Exception {
- exception.expect(PartitionNotExistException.class);
- catalog.getPartition(path1, createPartitionSpec());
- }
-
- @Test
- public void testGetPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
- path1.getFullName(), TEST_CATALOG_NAME));
- catalog.getPartition(path1, partitionSpec);
- }
-
- @Test
- public void testGetPartition_PartitionSpecInvalidException_invalidPartitionSpec() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.getPartition(path1, partitionSpec);
- }
-
- @Test
- public void testGetPartition_PartitionNotExistException_PartitionSpecInvalid_sizeNotEqual() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
- new HashMap<String, String>() {{
- put("second", "bob");
- }}
- );
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.getPartition(path1, partitionSpec);
- }
-
- @Test
- public void testGetPartition_PartitionNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- exception.expect(PartitionNotExistException.class);
- exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s does not exist.",
- partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
- catalog.getPartition(path1, partitionSpec);
- }
-
- @Test
- public void testPartitionExists() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
- catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-
- assertTrue(catalog.partitionExists(path1, createPartitionSpec()));
- assertFalse(catalog.partitionExists(path2, createPartitionSpec()));
- assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
- }
-
// ------ statistics ------
@Test
@@ -542,48 +212,11 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
TEST_COMMENT);
}
- private CatalogPartitionSpec createPartitionSpec() {
- return new CatalogPartitionSpec(
- new HashMap<String, String>() {{
- put("third", "2000");
- put("second", "bob");
- }});
- }
-
- private CatalogPartitionSpec createAnotherPartitionSpec() {
- return new CatalogPartitionSpec(
- new HashMap<String, String>() {{
- put("third", "2010");
- put("second", "bob");
- }});
- }
-
- private CatalogPartitionSpec createPartitionSpecSubset() {
- return new CatalogPartitionSpec(
- new HashMap<String, String>() {{
- put("second", "bob");
- }});
- }
-
- private CatalogPartitionSpec createInvalidPartitionSpecSubset() {
- return new CatalogPartitionSpec(
- new HashMap<String, String>() {{
- put("third", "2010");
- }});
- }
-
- private CatalogPartition createPartition() {
- return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
- }
-
- private CatalogPartition createAnotherPartition() {
+ @Override
+ public CatalogPartition createPartition() {
return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
}
- private CatalogPartition createPartition(Map<String, String> props) {
- return new GenericCatalogPartition(props, "Generic catalog table");
- }
-
@Override
public CatalogView createView() {
return new GenericCatalogView(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index cd5be1a..20c4b7e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -273,7 +273,7 @@ public interface Catalog {
* @param partitionSpec partition spec of partition to get
* @return the requested partition
*
- * @throws PartitionNotExistException thrown if the partition is not partitioned
+ * @throws PartitionNotExistException thrown if the partition doesn't exist
* @throws CatalogException in case of any runtime exception
*/
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5bfb98a..88d195e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -27,8 +27,12 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.functions.ScalarFunction;
import org.junit.After;
@@ -38,6 +42,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -47,6 +52,7 @@ import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -753,6 +759,330 @@ public abstract class CatalogTestBase {
catalog.dropDatabase(db1, false);
}
+ // ------ partitions ------
+
+ @Test
+ public void testCreatePartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ assertTrue(catalog.listPartitions(path1).isEmpty());
+
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
+ checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
+
+ catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
+
+ assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1));
+ assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
+ checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
+ }
+
+ @Test
+ public void testCreatePartition_TableNotExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage(
+ String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_TableNotPartitionedException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(TableNotPartitionedException.class);
+ exception.expectMessage(
+ String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.createPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ CatalogPartition partition = createPartition();
+ catalog.createPartition(path1, createPartitionSpec(), partition, false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+
+ exception.expect(PartitionAlreadyExistsException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s already exists.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.createPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path1, partitionSpec, createPartition(), false);
+ catalog.createPartition(path1, partitionSpec, createPartition(), true);
+ }
+
+ @Test
+ public void testDropPartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
+ catalog.dropPartition(path1, createPartitionSpec(), false);
+
+ assertEquals(Collections.emptyList(), catalog.listPartitions(path1));
+ }
+
+ @Test
+ public void testDropPartition_TableNotExist() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_TableNotPartitioned() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionSpecInvalid() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExist() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.dropPartition(path1, createPartitionSpec(), true);
+ }
+
+ @Test
+ public void testAlterPartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+ CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
+ checkEquals(createPartition(), cp);
+ assertNull(cp.getProperties().get("k"));
+
+ CatalogPartition another = createPartition();
+ another.getProperties().put("k", "v");
+ catalog.alterPartition(path1, createPartitionSpec(), another, false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+ cp = catalog.getPartition(path1, createPartitionSpec());
+ checkEquals(another, cp);
+ assertEquals("v", cp.getProperties().get("k"));
+ }
+
+ @Test
+ public void testAlterPartition_TableNotExist() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testAlterPartition_TableNotPartitioned() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExist() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.alterPartition(path1, createPartitionSpec(), createPartition(), true);
+ }
+
+ @Test
+ public void testGetPartition_TableNotExist() throws Exception {
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+ path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testGetPartition_TableNotPartitioned() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+ path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("second", "bob");
+ }}
+ );
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testGetPartition_PartitionNotExist() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s does not exist.",
+ partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testPartitionExists() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+ assertTrue(catalog.partitionExists(path1, createPartitionSpec()));
+ assertFalse(catalog.partitionExists(path2, createPartitionSpec()));
+ assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
+ }
+
+ @Test
+ public void testListPartitionPartialSpec() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+ catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
+
+ assertEquals(2, catalog.listPartitions(path1, createPartitionSpecSubset()).size());
+ assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size());
+ }
+
// ------ utilities ------
/**
@@ -832,6 +1162,13 @@ public abstract class CatalogTestBase {
*/
protected abstract CatalogFunction createAnotherFunction();
+ /**
+ * Creates a CatalogPartition by specific catalog implementation.
+ *
+ * @return a CatalogPartition
+ */
+ public abstract CatalogPartition createPartition();
+
protected TableSchema createTableSchema() {
return new TableSchema(
new String[] {"first", "second", "third"},
@@ -858,6 +1195,44 @@ public abstract class CatalogTestBase {
return Arrays.asList("second", "third");
}
+ protected CatalogPartitionSpec createPartitionSpec() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2000");
+ put("second", "bob");
+ }});
+ }
+
+ protected CatalogPartitionSpec createAnotherPartitionSpec() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2010");
+ put("second", "bob");
+ }});
+ }
+
+ protected CatalogPartitionSpec createPartitionSpecSubset() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("second", "bob");
+ }});
+ }
+
+ protected CatalogPartitionSpec createAnotherPartitionSpecSubset() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2000");
+ }}
+ );
+ }
+
+ protected CatalogPartitionSpec createInvalidPartitionSpecSubset() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2010");
+ }});
+ }
+
protected Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {{
put(IS_STREAMING, "false");
@@ -1032,4 +1407,8 @@ public abstract class CatalogTestBase {
assertEquals(f1.getClassName(), f2.getClassName());
assertEquals(f1.getProperties(), f2.getProperties());
}
+
+ protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
+ assertEquals(expected.getProperties(), actual.getProperties());
+ }
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 2b98091..2b7593e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -50,10 +50,6 @@ public class CatalogTestUtil {
assertEquals(d1.getProperties(), d2.getProperties());
}
- public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
- assertEquals(p1.getProperties(), p2.getProperties());
- }
-
static void checkEquals(CatalogTableStatistics ts1, CatalogTableStatistics ts2) {
assertEquals(ts1.getRowCount(), ts2.getRowCount());
assertEquals(ts1.getFileCount(), ts2.getFileCount());