You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/02 18:16:13 UTC
[flink] branch master updated: [FLINK-13021][table][hive] unify
catalog partition implementations
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 59ff00d [FLINK-13021][table][hive] unify catalog partition implementations
59ff00d is described below
commit 59ff00d71d298fa61a92efa4fecd46f3cefc50f6
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Jul 1 12:31:47 2019 -0700
[FLINK-13021][table][hive] unify catalog partition implementations
This PR unifies catalog partition implementations.
This closes #8926.
---
.../flink/table/catalog/hive/HiveCatalog.java | 54 ++++++++++++-------
.../table/catalog/hive/HiveCatalogConfig.java | 17 ++----
.../table/catalog/hive/HiveCatalogPartition.java | 61 ----------------------
.../table/catalog/hive/HivePartitionConfig.java | 17 ++----
.../connectors/hive/HiveTableOutputFormatTest.java | 9 ++--
.../batch/connectors/hive/HiveTableSinkTest.java | 8 +--
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 19 -------
flink-python/pyflink/table/catalog.py | 26 +--------
flink-python/pyflink/table/tests/test_catalog.py | 8 +--
...logPartition.java => CatalogPartitionImpl.java} | 23 ++++++--
.../table/catalog/GenericCatalogPartition.java | 52 ------------------
.../flink/table/catalog/CatalogTestBase.java | 5 ++
.../table/catalog/GenericInMemoryCatalogTest.java | 26 ---------
.../flink/table/catalog/CatalogPartition.java | 7 +++
.../apache/flink/table/catalog/CatalogTest.java | 40 +++-----------
.../flink/table/catalog/CatalogTestUtil.java | 13 +++++
16 files changed, 111 insertions(+), 274 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8659a80..03ddceb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,15 +29,14 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
-import org.apache.flink.table.catalog.GenericCatalogPartition;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.config.CatalogTableConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -483,7 +482,7 @@ public class HiveCatalog extends AbstractCatalog {
if (isGeneric) {
properties = retrieveFlinkProperties(properties);
}
- String comment = properties.remove(CatalogTableConfig.TABLE_COMMENT);
+ String comment = properties.remove(HiveCatalogConfig.COMMENT);
// Table schema
TableSchema tableSchema =
@@ -515,7 +514,7 @@ public class HiveCatalog extends AbstractCatalog {
Map<String, String> properties = new HashMap<>(table.getProperties());
// Table comment
- properties.put(CatalogTableConfig.TABLE_COMMENT, table.getComment());
+ properties.put(HiveCatalogConfig.COMMENT, table.getComment());
boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
@@ -623,8 +622,10 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
checkNotNull(partition, "Partition cannot be null");
- if (!(partition instanceof HiveCatalogPartition)) {
- throw new CatalogException("Currently only supports HiveCatalogPartition");
+ boolean isGeneric = Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+ if (isGeneric) {
+ throw new CatalogException("Currently only supports non-generic CatalogPartition");
}
Table hiveTable = getHiveTable(tablePath);
@@ -715,7 +716,14 @@ public class HiveCatalog extends AbstractCatalog {
try {
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
- return instantiateCatalogPartition(hivePartition);
+
+ Map<String, String> properties = hivePartition.getParameters();
+
+ properties.put(HivePartitionConfig.PARTITION_LOCATION, hivePartition.getSd().getLocation());
+
+ String comment = properties.remove(HiveCatalogConfig.COMMENT);
+
+ return new CatalogPartitionImpl(properties, comment);
} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
} catch (TException e) {
@@ -731,8 +739,10 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
checkNotNull(newPartition, "New partition cannot be null");
- if (!(newPartition instanceof HiveCatalogPartition)) {
- throw new CatalogException("Currently only supports HiveCatalogPartition");
+ boolean isGeneric = Boolean.valueOf(newPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+ if (isGeneric) {
+ throw new CatalogException("Currently only supports non-generic CatalogPartition");
}
// Explicitly check if the partition exists or not
@@ -771,11 +781,12 @@ public class HiveCatalog extends AbstractCatalog {
// make sure both table and partition are generic, or neither is
private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
- boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
- if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
- (!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
+ boolean tableIsGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
+ boolean partitionIsGeneric = Boolean.valueOf(catalogPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+ if (tableIsGeneric != partitionIsGeneric) {
throw new CatalogException(String.format("Cannot handle %s partition for %s table",
- catalogPartition.getClass().getName(), isGeneric ? "generic" : "non-generic"));
+ catalogPartition.getClass().getName(), tableIsGeneric ? "generic" : "non-generic"));
}
}
@@ -792,15 +803,18 @@ public class HiveCatalog extends AbstractCatalog {
}
}
// TODO: handle GenericCatalogPartition
- HiveCatalogPartition hiveCatalogPartition = (HiveCatalogPartition) catalogPartition;
StorageDescriptor sd = hiveTable.getSd().deepCopy();
- sd.setLocation(hiveCatalogPartition.getLocation());
- return HiveTableUtil.createHivePartition(hiveTable.getDbName(), hiveTable.getTableName(), partValues,
- sd, hiveCatalogPartition.getProperties());
- }
+ sd.setLocation(catalogPartition.getProperties().remove(HivePartitionConfig.PARTITION_LOCATION));
+
+ Map<String, String> properties = new HashMap<>(catalogPartition.getProperties());
+ properties.put(HiveCatalogConfig.COMMENT, catalogPartition.getComment());
- private static CatalogPartition instantiateCatalogPartition(Partition hivePartition) {
- return new HiveCatalogPartition(hivePartition.getParameters(), hivePartition.getSd().getLocation());
+ return HiveTableUtil.createHivePartition(
+ hiveTable.getDbName(),
+ hiveTable.getTableName(),
+ partValues,
+ sd,
+ properties);
}
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
similarity index 63%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
index 6d0c514..684f50e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
@@ -16,21 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
/**
- * Config for {@link CatalogBaseTable}.
+ * Configs for catalog meta-objects in {@link HiveCatalog}.
*/
-public class CatalogTableConfig {
-
- // Comment of catalog table
- public static final String TABLE_COMMENT = "comment";
-
- // Partition keys of catalog table
- public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HiveCatalogConfig {
- // Prefix for properties of catalog table
- public static final String TABLE_PROPERTIES = "properties";
+ public static final String COMMENT = "comment";
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
deleted file mode 100644
index 98b13a2..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.AbstractCatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A CatalogPartition implementation that represents a Partition in Hive.
- */
-public class HiveCatalogPartition extends AbstractCatalogPartition {
- private final String location;
-
- public HiveCatalogPartition(Map<String, String> properties, String location) {
- super(properties, null);
- this.location = location;
- }
-
- public HiveCatalogPartition(Map<String, String> properties) {
- this(properties, null);
- }
-
- public String getLocation() {
- return location;
- }
-
- @Override
- public CatalogPartition copy() {
- return new HiveCatalogPartition(new HashMap<>(getProperties()), location);
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.empty();
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.empty();
- }
-}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
similarity index 63%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
index 6d0c514..0551b72 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
@@ -16,21 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
/**
- * Config for {@link CatalogBaseTable}.
+ * Configs for partition in {@link HiveCatalog}.
*/
-public class CatalogTableConfig {
-
- // Comment of catalog table
- public static final String TABLE_COMMENT = "comment";
-
- // Partition keys of catalog table
- public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HivePartitionConfig {
+ public static final String PARTITION_LOCATION = "partition.location";
- // Prefix for properties of catalog table
- public static final String TABLE_PROPERTIES = "properties";
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index 206f831..89dfc8a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.types.Row;
@@ -128,9 +129,11 @@ public class HiveTableOutputFormatTest {
// make sure new partition is created
assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
- HiveCatalogPartition catalogPartition = (HiveCatalogPartition) hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
+ CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))));
- verifyWrittenData(new Path(catalogPartition.getLocation(), "0"), toWrite, 1);
+
+ String partitionLocation = catalogPartition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+ verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
hiveCatalog.dropTable(tablePath, false);
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 6b25adb..55f5336 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,12 +23,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
@@ -121,8 +122,9 @@ public class HiveTableSinkTest {
List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
assertEquals(toWrite.size(), partitionSpecs.size());
for (int i = 0; i < toWrite.size(); i++) {
- HiveCatalogPartition partition = (HiveCatalogPartition) hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
- verifyWrittenData(new Path(partition.getLocation(), "0"), Collections.singletonList(toWrite.get(i)), 1);
+ CatalogPartition partition = hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
+ String partitionLocation = partition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+ verifyWrittenData(new Path(partitionLocation, "0"), Collections.singletonList(toWrite.get(i)), 1);
}
hiveCatalog.dropTable(tablePath, false);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index e641e23..c10d31c 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -43,9 +42,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
/**
* Test for HiveCatalog on Hive metadata.
@@ -137,20 +134,4 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
"Hive table cannot be streaming."
);
}
-
- @Override
- public CatalogPartition createPartition() {
- return new HiveCatalogPartition(getBatchTableProperties());
- }
-
- @Override
- protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
- assertTrue(expected instanceof HiveCatalogPartition && actual instanceof HiveCatalogPartition);
- assertEquals(expected.getClass(), actual.getClass());
- HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) expected;
- HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) actual;
- assertEquals(hivePartition1.getDescription(), hivePartition2.getDescription());
- assertEquals(hivePartition1.getDetailedDescription(), hivePartition2.getDetailedDescription());
- assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
- }
}
diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index d283d16..2748d77 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -23,8 +23,7 @@ from pyflink.table.table_schema import TableSchema
__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction',
'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
- 'CatalogColumnStatistics', 'HiveCatalog',
- 'HiveCatalogPartition']
+ 'CatalogColumnStatistics', 'HiveCatalog']
class Catalog(object):
@@ -695,11 +694,7 @@ class CatalogPartition(object):
@staticmethod
def _get(j_catalog_partition):
- if j_catalog_partition.getClass().getName() == \
- "org.apache.flink.table.catalog.hive.HiveCatalogPartition":
- return HiveCatalogPartition(j_hive_catalog_partition=j_catalog_partition)
- else:
- return CatalogPartition(j_catalog_partition)
+ return CatalogPartition(j_catalog_partition)
def get_properties(self):
"""
@@ -973,20 +968,3 @@ class HiveCatalog(Catalog):
j_hive_catalog = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
catalog_name, default_database, hive_site_url)
super(HiveCatalog, self).__init__(j_hive_catalog)
-
-
-class HiveCatalogPartition(CatalogPartition):
- """
- A CatalogPartition implementation that represents a Partition in Hive.
- """
-
- def __int__(self, properties=None, location=None, j_hive_catalog_partition=None):
- gateway = get_gateway()
- if j_hive_catalog_partition is None:
- j_hive_catalog_partition = \
- gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogPartition(
- properties, location)
- super(HiveCatalogPartition, self).__init__(j_hive_catalog_partition)
-
- def get_location(self):
- return self._j_catalog_partition.getLocation()
diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py
index 5a340f7..81ea53a 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -204,8 +204,8 @@ class CatalogTestBase(PyFlinkTestCase):
@staticmethod
def create_partition():
gateway = get_gateway()
- j_partition = gateway.jvm.GenericCatalogPartition(
- CatalogTestBase.get_batch_table_properties(), "Generic batch table")
+ j_partition = gateway.jvm.CatalogPartitionImpl(
+ CatalogTestBase.get_batch_table_properties(), "catalog partition tests")
return CatalogPartition(j_partition)
@staticmethod
@@ -808,8 +808,8 @@ class CatalogTestBase(PyFlinkTestCase):
self.assertIsNone(cp.get_properties().get("k"))
gateway = get_gateway()
- j_partition = gateway.jvm.GenericCatalogPartition(
- {"is_streaming": "false", "k": "v"}, "Generic batch table")
+ j_partition = gateway.jvm.CatalogPartitionImpl(
+ {"is_streaming": "false", "k": "v"}, "catalog partition")
another = CatalogPartition(j_partition)
self.catalog.alter_partition(self.path1, self.create_partition_spec(), another, False)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
similarity index 69%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
index 818d72a..f082cdc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
@@ -18,18 +18,20 @@
package org.apache.flink.table.catalog;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * An abstract catalog partition implementation.
+ * A catalog partition implementation.
*/
-public abstract class AbstractCatalogPartition implements CatalogPartition {
+public class CatalogPartitionImpl implements CatalogPartition {
private final Map<String, String> properties;
private final String comment;
- public AbstractCatalogPartition(Map<String, String> properties, String comment) {
+ public CatalogPartitionImpl(Map<String, String> properties, String comment) {
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
}
@@ -39,8 +41,23 @@ public abstract class AbstractCatalogPartition implements CatalogPartition {
return properties;
}
+ @Override
public String getComment() {
return comment;
}
+ @Override
+ public CatalogPartition copy() {
+ return new CatalogPartitionImpl(new HashMap<>(properties), comment);
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.empty();
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
deleted file mode 100644
index 1a4563f..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.catalog.config.CatalogConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A generic catalog partition implementation.
- */
-public class GenericCatalogPartition extends AbstractCatalogPartition {
-
- public GenericCatalogPartition(Map<String, String> properties, String comment) {
- super(properties, comment);
- properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
- }
-
- @Override
- public CatalogPartition copy() {
- return new GenericCatalogPartition(new HashMap<>(getProperties()), getComment());
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(getComment());
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog partition with detailed description");
- }
-
-}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 801914f..19e9123 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -96,6 +96,11 @@ public abstract class CatalogTestBase extends CatalogTest {
}
@Override
+ public CatalogPartition createPartition() {
+ return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
+ }
+
+ @Override
public CatalogView createView() {
return new CatalogViewImpl(
String.format("select * from %s", t1),
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 0014501..45c511b 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -88,26 +87,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
}
- // ------ partitions ------
-
- @Test
- public void testAlterPartition_differentTypedPartition() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createPartitionedTable(), false);
-
- CatalogPartitionSpec partitionSpec = createPartitionSpec();
- CatalogPartition partition = createPartition();
- catalog.createPartition(path1, partitionSpec, partition, false);
-
- exception.expect(CatalogException.class);
- exception.expectMessage(
- String.format("Partition types don't match. " +
- "Existing partition is '%s' and " +
- "new partition is 'org.apache.flink.table.catalog.CatalogTest$TestPartition'.",
- partition.getClass().getName()));
- catalog.alterPartition(path1, partitionSpec, new TestPartition(), false);
- }
-
// ------ statistics ------
@Test
@@ -156,11 +135,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
return true;
}
- @Override
- public CatalogPartition createPartition() {
- return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
- }
-
private CatalogColumnStatistics createColumnStats() {
CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
index 47dce25..7c36ed8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
@@ -34,6 +34,13 @@ public interface CatalogPartition {
Map<String, String> getProperties();
/**
+ * Get comment of the partition.
+ *
+ * @return comment of the partition
+ */
+ String getComment();
+
+ /**
* Get a deep copy of the CatalogPartition instance.
*
* @return a copy of CatalogPartition instance
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 757a4c3..0c2b632 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -742,13 +742,13 @@ public abstract class CatalogTest {
assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
- checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
+ CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1));
assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
- checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
+ CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
}
@Test
@@ -891,16 +891,19 @@ public abstract class CatalogTest {
assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
- checkEquals(createPartition(), cp);
+ CatalogTestUtil.checkEquals(createPartition(), cp);
assertNull(cp.getProperties().get("k"));
CatalogPartition another = createPartition();
another.getProperties().put("k", "v");
+
catalog.alterPartition(path1, createPartitionSpec(), another, false);
assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
cp = catalog.getPartition(path1, createPartitionSpec());
- checkEquals(another, cp);
+
+ CatalogTestUtil.checkEquals(another, cp);
assertEquals("v", cp.getProperties().get("k"));
}
@@ -1233,31 +1236,6 @@ public abstract class CatalogTest {
}
}
- /**
- * Test partition used to assert on partition of different class.
- */
- public static class TestPartition implements CatalogPartition {
- @Override
- public Map<String, String> getProperties() {
- return null;
- }
-
- @Override
- public CatalogPartition copy() {
- return null;
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.empty();
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.empty();
- }
- }
-
// ------ equality check utils ------
// Can be overriden by sub test class
@@ -1266,10 +1244,6 @@ public abstract class CatalogTest {
assertEquals(f1.getProperties(), f2.getProperties());
}
- protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
- assertEquals(expected.getProperties(), actual.getProperties());
- }
-
protected void checkEquals(CatalogColumnStatistics cs1, CatalogColumnStatistics cs2) {
CatalogTestUtil.checkEquals(cs1, cs2);
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 1c64025..53e4ed7 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -80,6 +80,19 @@ public class CatalogTestUtil {
}
}
+ public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
+ assertEquals(p1.getClass(), p2.getClass());
+ assertEquals(p1.getComment(), p2.getComment());
+
+ // Hive tables may have properties created by itself
+ // thus properties of Hive table is a super set of those in its corresponding Flink table
+ if (Boolean.valueOf(p1.getProperties().get(CatalogConfig.IS_GENERIC))) {
+ assertEquals(p1.getProperties(), p2.getProperties());
+ } else {
+ assertTrue(p2.getProperties().entrySet().containsAll(p1.getProperties().entrySet()));
+ }
+ }
+
public static void checkEquals(TableStats ts1, TableStats ts2) {
assertEquals(ts1.getRowCount(), ts2.getRowCount());
assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());