You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/02 18:16:13 UTC

[flink] branch master updated: [FLINK-13021][table][hive] unify catalog partition implementations

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ff00d  [FLINK-13021][table][hive] unify catalog partition implementations
59ff00d is described below

commit 59ff00d71d298fa61a92efa4fecd46f3cefc50f6
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Jul 1 12:31:47 2019 -0700

    [FLINK-13021][table][hive] unify catalog partition implementations
    
    This PR unifies catalog partition implementations.
    
    This closes #8926.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 54 ++++++++++++-------
 .../table/catalog/hive/HiveCatalogConfig.java      | 17 ++----
 .../table/catalog/hive/HiveCatalogPartition.java   | 61 ----------------------
 .../table/catalog/hive/HivePartitionConfig.java    | 17 ++----
 .../connectors/hive/HiveTableOutputFormatTest.java |  9 ++--
 .../batch/connectors/hive/HiveTableSinkTest.java   |  8 +--
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 19 -------
 flink-python/pyflink/table/catalog.py              | 26 +--------
 flink-python/pyflink/table/tests/test_catalog.py   |  8 +--
 ...logPartition.java => CatalogPartitionImpl.java} | 23 ++++++--
 .../table/catalog/GenericCatalogPartition.java     | 52 ------------------
 .../flink/table/catalog/CatalogTestBase.java       |  5 ++
 .../table/catalog/GenericInMemoryCatalogTest.java  | 26 ---------
 .../flink/table/catalog/CatalogPartition.java      |  7 +++
 .../apache/flink/table/catalog/CatalogTest.java    | 40 +++-----------
 .../flink/table/catalog/CatalogTestUtil.java       | 13 +++++
 16 files changed, 111 insertions(+), 274 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8659a80..03ddceb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,15 +29,14 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
-import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.config.CatalogTableConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -483,7 +482,7 @@ public class HiveCatalog extends AbstractCatalog {
 		if (isGeneric) {
 			properties = retrieveFlinkProperties(properties);
 		}
-		String comment = properties.remove(CatalogTableConfig.TABLE_COMMENT);
+		String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
 		// Table schema
 		TableSchema tableSchema =
@@ -515,7 +514,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 		Map<String, String> properties = new HashMap<>(table.getProperties());
 		// Table comment
-		properties.put(CatalogTableConfig.TABLE_COMMENT, table.getComment());
+		properties.put(HiveCatalogConfig.COMMENT, table.getComment());
 
 		boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
 
@@ -623,8 +622,10 @@ public class HiveCatalog extends AbstractCatalog {
 		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
 		checkNotNull(partition, "Partition cannot be null");
 
-		if (!(partition instanceof HiveCatalogPartition)) {
-			throw new CatalogException("Currently only supports HiveCatalogPartition");
+		boolean isGeneric = Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+		if (isGeneric) {
+			throw new CatalogException("Currently only supports non-generic CatalogPartition");
 		}
 
 		Table hiveTable = getHiveTable(tablePath);
@@ -715,7 +716,14 @@ public class HiveCatalog extends AbstractCatalog {
 
 		try {
 			Partition hivePartition = getHivePartition(tablePath, partitionSpec);
-			return instantiateCatalogPartition(hivePartition);
+
+			Map<String, String> properties = hivePartition.getParameters();
+
+			properties.put(HivePartitionConfig.PARTITION_LOCATION, hivePartition.getSd().getLocation());
+
+			String comment = properties.remove(HiveCatalogConfig.COMMENT);
+
+			return new CatalogPartitionImpl(properties, comment);
 		} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
 			throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
 		} catch (TException e) {
@@ -731,8 +739,10 @@ public class HiveCatalog extends AbstractCatalog {
 		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
 		checkNotNull(newPartition, "New partition cannot be null");
 
-		if (!(newPartition instanceof HiveCatalogPartition)) {
-			throw new CatalogException("Currently only supports HiveCatalogPartition");
+		boolean isGeneric = Boolean.valueOf(newPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+		if (isGeneric) {
+			throw new CatalogException("Currently only supports non-generic CatalogPartition");
 		}
 
 		// Explicitly check if the partition exists or not
@@ -771,11 +781,12 @@ public class HiveCatalog extends AbstractCatalog {
 
 	// make sure both table and partition are generic, or neither is
 	private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
-		boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
-		if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
-			(!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
+		boolean tableIsGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
+		boolean partitionIsGeneric = Boolean.valueOf(catalogPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+		if (tableIsGeneric != partitionIsGeneric) {
 			throw new CatalogException(String.format("Cannot handle %s partition for %s table",
-				catalogPartition.getClass().getName(), isGeneric ? "generic" : "non-generic"));
+				catalogPartition.getClass().getName(), tableIsGeneric ? "generic" : "non-generic"));
 		}
 	}
 
@@ -792,15 +803,18 @@ public class HiveCatalog extends AbstractCatalog {
 			}
 		}
 		// TODO: handle GenericCatalogPartition
-		HiveCatalogPartition hiveCatalogPartition = (HiveCatalogPartition) catalogPartition;
 		StorageDescriptor sd = hiveTable.getSd().deepCopy();
-		sd.setLocation(hiveCatalogPartition.getLocation());
-		return HiveTableUtil.createHivePartition(hiveTable.getDbName(), hiveTable.getTableName(), partValues,
-				sd, hiveCatalogPartition.getProperties());
-	}
+		sd.setLocation(catalogPartition.getProperties().remove(HivePartitionConfig.PARTITION_LOCATION));
+
+		Map<String, String> properties = new HashMap<>(catalogPartition.getProperties());
+		properties.put(HiveCatalogConfig.COMMENT, catalogPartition.getComment());
 
-	private static CatalogPartition instantiateCatalogPartition(Partition hivePartition) {
-		return new HiveCatalogPartition(hivePartition.getParameters(), hivePartition.getSd().getLocation());
+		return HiveTableUtil.createHivePartition(
+				hiveTable.getDbName(),
+				hiveTable.getTableName(),
+				partValues,
+				sd,
+				properties);
 	}
 
 	private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
similarity index 63%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
index 6d0c514..684f50e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
@@ -16,21 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Config for {@link CatalogBaseTable}.
+ * Configs for catalog meta-objects in {@link HiveCatalog}.
  */
-public class CatalogTableConfig {
-
-	// Comment of catalog table
-	public static final String TABLE_COMMENT = "comment";
-
-	// Partition keys of catalog table
-	public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HiveCatalogConfig {
 
-	// Prefix for properties of catalog table
-	public static final String TABLE_PROPERTIES = "properties";
+	public static final String COMMENT = "comment";
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
deleted file mode 100644
index 98b13a2..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.AbstractCatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A CatalogPartition implementation that represents a Partition in Hive.
- */
-public class HiveCatalogPartition extends AbstractCatalogPartition {
-	private final String location;
-
-	public HiveCatalogPartition(Map<String, String> properties, String location) {
-		super(properties, null);
-		this.location = location;
-	}
-
-	public HiveCatalogPartition(Map<String, String> properties) {
-		this(properties, null);
-	}
-
-	public String getLocation() {
-		return location;
-	}
-
-	@Override
-	public CatalogPartition copy() {
-		return new HiveCatalogPartition(new HashMap<>(getProperties()), location);
-	}
-
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.empty();
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.empty();
-	}
-}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
similarity index 63%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
index 6d0c514..0551b72 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
@@ -16,21 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Config for {@link CatalogBaseTable}.
+ * Configs for partition in {@link HiveCatalog}.
  */
-public class CatalogTableConfig {
-
-	// Comment of catalog table
-	public static final String TABLE_COMMENT = "comment";
-
-	// Partition keys of catalog table
-	public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HivePartitionConfig {
+	public static final String PARTITION_LOCATION = "partition.location";
 
-	// Prefix for properties of catalog table
-	public static final String TABLE_PROPERTIES = "properties";
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index 206f831..89dfc8a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.types.Row;
 
@@ -128,9 +129,11 @@ public class HiveTableOutputFormatTest {
 
 		// make sure new partition is created
 		assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
-		HiveCatalogPartition catalogPartition = (HiveCatalogPartition) hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
+		CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
 				partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))));
-		verifyWrittenData(new Path(catalogPartition.getLocation(), "0"), toWrite, 1);
+
+		String partitionLocation = catalogPartition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+		verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
 
 		hiveCatalog.dropTable(tablePath, false);
 	}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 6b25adb..55f5336 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,12 +23,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -121,8 +122,9 @@ public class HiveTableSinkTest {
 		List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
 		assertEquals(toWrite.size(), partitionSpecs.size());
 		for (int i = 0; i < toWrite.size(); i++) {
-			HiveCatalogPartition partition = (HiveCatalogPartition) hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
-			verifyWrittenData(new Path(partition.getLocation(), "0"), Collections.singletonList(toWrite.get(i)), 1);
+			CatalogPartition partition = hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
+			String partitionLocation = partition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+			verifyWrittenData(new Path(partitionLocation, "0"), Collections.singletonList(toWrite.get(i)), 1);
 		}
 
 		hiveCatalog.dropTable(tablePath, false);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index e641e23..c10d31c 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -43,9 +42,7 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Test for HiveCatalog on Hive metadata.
@@ -137,20 +134,4 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 			"Hive table cannot be streaming."
 		);
 	}
-
-	@Override
-	public CatalogPartition createPartition() {
-		return new HiveCatalogPartition(getBatchTableProperties());
-	}
-
-	@Override
-	protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
-		assertTrue(expected instanceof HiveCatalogPartition && actual instanceof HiveCatalogPartition);
-		assertEquals(expected.getClass(), actual.getClass());
-		HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) expected;
-		HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) actual;
-		assertEquals(hivePartition1.getDescription(), hivePartition2.getDescription());
-		assertEquals(hivePartition1.getDetailedDescription(), hivePartition2.getDetailedDescription());
-		assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
-	}
 }
diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index d283d16..2748d77 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -23,8 +23,7 @@ from pyflink.table.table_schema import TableSchema
 
 __all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction',
            'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
-           'CatalogColumnStatistics', 'HiveCatalog',
-           'HiveCatalogPartition']
+           'CatalogColumnStatistics', 'HiveCatalog']
 
 
 class Catalog(object):
@@ -695,11 +694,7 @@ class CatalogPartition(object):
 
     @staticmethod
     def _get(j_catalog_partition):
-        if j_catalog_partition.getClass().getName() == \
-                "org.apache.flink.table.catalog.hive.HiveCatalogPartition":
-            return HiveCatalogPartition(j_hive_catalog_partition=j_catalog_partition)
-        else:
-            return CatalogPartition(j_catalog_partition)
+        return CatalogPartition(j_catalog_partition)
 
     def get_properties(self):
         """
@@ -973,20 +968,3 @@ class HiveCatalog(Catalog):
             j_hive_catalog = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
                 catalog_name, default_database, hive_site_url)
         super(HiveCatalog, self).__init__(j_hive_catalog)
-
-
-class HiveCatalogPartition(CatalogPartition):
-    """
-    A CatalogPartition implementation that represents a Partition in Hive.
-    """
-
-    def __int__(self, properties=None, location=None, j_hive_catalog_partition=None):
-        gateway = get_gateway()
-        if j_hive_catalog_partition is None:
-            j_hive_catalog_partition = \
-                gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogPartition(
-                    properties, location)
-        super(HiveCatalogPartition, self).__init__(j_hive_catalog_partition)
-
-    def get_location(self):
-        return self._j_catalog_partition.getLocation()
diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py
index 5a340f7..81ea53a 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -204,8 +204,8 @@ class CatalogTestBase(PyFlinkTestCase):
     @staticmethod
     def create_partition():
         gateway = get_gateway()
-        j_partition = gateway.jvm.GenericCatalogPartition(
-            CatalogTestBase.get_batch_table_properties(), "Generic batch table")
+        j_partition = gateway.jvm.CatalogPartitionImpl(
+            CatalogTestBase.get_batch_table_properties(), "catalog partition tests")
         return CatalogPartition(j_partition)
 
     @staticmethod
@@ -808,8 +808,8 @@ class CatalogTestBase(PyFlinkTestCase):
         self.assertIsNone(cp.get_properties().get("k"))
 
         gateway = get_gateway()
-        j_partition = gateway.jvm.GenericCatalogPartition(
-            {"is_streaming": "false", "k": "v"}, "Generic batch table")
+        j_partition = gateway.jvm.CatalogPartitionImpl(
+            {"is_streaming": "false", "k": "v"}, "catalog partition")
         another = CatalogPartition(j_partition)
         self.catalog.alter_partition(self.path1, self.create_partition_spec(), another, False)
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
similarity index 69%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
index 818d72a..f082cdc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.table.catalog;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An abstract catalog partition implementation.
+ * A catalog partition implementation.
  */
-public abstract class AbstractCatalogPartition implements CatalogPartition {
+public class CatalogPartitionImpl implements CatalogPartition {
 	private final Map<String, String> properties;
 	private final String comment;
 
-	public AbstractCatalogPartition(Map<String, String> properties, String comment) {
+	public CatalogPartitionImpl(Map<String, String> properties, String comment) {
 		this.properties = checkNotNull(properties, "properties cannot be null");
 		this.comment = comment;
 	}
@@ -39,8 +41,23 @@ public abstract class AbstractCatalogPartition implements CatalogPartition {
 		return properties;
 	}
 
+	@Override
 	public String getComment() {
 		return comment;
 	}
 
+	@Override
+	public CatalogPartition copy() {
+		return new CatalogPartitionImpl(new HashMap<>(properties), comment);
+	}
+
+	@Override
+	public Optional<String> getDescription() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<String> getDetailedDescription() {
+		return Optional.empty();
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
deleted file mode 100644
index 1a4563f..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.catalog.config.CatalogConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A generic catalog partition implementation.
- */
-public class GenericCatalogPartition extends AbstractCatalogPartition {
-
-	public GenericCatalogPartition(Map<String, String> properties, String comment) {
-		super(properties, comment);
-		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
-	}
-
-	@Override
-	public CatalogPartition copy() {
-		return new GenericCatalogPartition(new HashMap<>(getProperties()), getComment());
-	}
-
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(getComment());
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog partition with detailed description");
-	}
-
-}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 801914f..19e9123 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -96,6 +96,11 @@ public abstract class CatalogTestBase extends CatalogTest {
 	}
 
 	@Override
+	public CatalogPartition createPartition() {
+		return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
+	}
+
+	@Override
 	public CatalogView createView() {
 		return new CatalogViewImpl(
 			String.format("select * from %s", t1),
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 0014501..45c511b 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -88,26 +87,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
 	}
 
-	// ------ partitions ------
-
-	@Test
-	public void testAlterPartition_differentTypedPartition() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		CatalogPartition partition = createPartition();
-		catalog.createPartition(path1, partitionSpec, partition, false);
-
-		exception.expect(CatalogException.class);
-		exception.expectMessage(
-			String.format("Partition types don't match. " +
-				"Existing partition is '%s' and " +
-				"new partition is 'org.apache.flink.table.catalog.CatalogTest$TestPartition'.",
-				partition.getClass().getName()));
-		catalog.alterPartition(path1, partitionSpec, new TestPartition(), false);
-	}
-
 	// ------ statistics ------
 
 	@Test
@@ -156,11 +135,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		return true;
 	}
 
-	@Override
-	public CatalogPartition createPartition() {
-		return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
-	}
-
 	private CatalogColumnStatistics createColumnStats() {
 		CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
 		CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
index 47dce25..7c36ed8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
@@ -34,6 +34,13 @@ public interface CatalogPartition {
 	Map<String, String> getProperties();
 
 	/**
+	 * Get comment of the partition.
+	 *
+	 * @return comment of the partition
+	 */
+	String getComment();
+
+	/**
 	 * Get a deep copy of the CatalogPartition instance.
 	 *
 	 * @return a copy of CatalogPartition instance
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 757a4c3..0c2b632 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -742,13 +742,13 @@ public abstract class CatalogTest {
 
 		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
 		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
-		checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
+		CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
 
 		catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
 
 		assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1));
 		assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
-		checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
+		CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
 	}
 
 	@Test
@@ -891,16 +891,19 @@ public abstract class CatalogTest {
 
 		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
 		CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
-		checkEquals(createPartition(), cp);
+		CatalogTestUtil.checkEquals(createPartition(), cp);
 		assertNull(cp.getProperties().get("k"));
 
 		CatalogPartition another = createPartition();
 		another.getProperties().put("k", "v");
+
 		catalog.alterPartition(path1, createPartitionSpec(), another, false);
 
 		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
 		cp = catalog.getPartition(path1, createPartitionSpec());
-		checkEquals(another, cp);
+
+		CatalogTestUtil.checkEquals(another, cp);
 		assertEquals("v", cp.getProperties().get("k"));
 	}
 
@@ -1233,31 +1236,6 @@ public abstract class CatalogTest {
 		}
 	}
 
-	/**
-	 * Test partition used to assert on partition of different class.
-	 */
-	public static class TestPartition implements CatalogPartition {
-		@Override
-		public Map<String, String> getProperties() {
-			return null;
-		}
-
-		@Override
-		public CatalogPartition copy() {
-			return null;
-		}
-
-		@Override
-		public Optional<String> getDescription() {
-			return Optional.empty();
-		}
-
-		@Override
-		public Optional<String> getDetailedDescription() {
-			return Optional.empty();
-		}
-	}
-
 	// ------ equality check utils ------
 	// Can be overriden by sub test class
 
@@ -1266,10 +1244,6 @@ public abstract class CatalogTest {
 		assertEquals(f1.getProperties(), f2.getProperties());
 	}
 
-	protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
-		assertEquals(expected.getProperties(), actual.getProperties());
-	}
-
 	protected void checkEquals(CatalogColumnStatistics cs1, CatalogColumnStatistics cs2) {
 		CatalogTestUtil.checkEquals(cs1, cs2);
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 1c64025..53e4ed7 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -80,6 +80,19 @@ public class CatalogTestUtil {
 		}
 	}
 
+	public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
+		assertEquals(p1.getClass(), p2.getClass());
+		assertEquals(p1.getComment(), p2.getComment());
+
+		// Hive tables may have properties created by itself
+		// thus properties of Hive table is a super set of those in its corresponding Flink table
+		if (Boolean.valueOf(p1.getProperties().get(CatalogConfig.IS_GENERIC))) {
+			assertEquals(p1.getProperties(), p2.getProperties());
+		} else {
+			assertTrue(p2.getProperties().entrySet().containsAll(p1.getProperties().entrySet()));
+		}
+	}
+
 	public static void checkEquals(TableStats ts1, TableStats ts2) {
 		assertEquals(ts1.getRowCount(), ts2.getRowCount());
 		assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());