You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/24 05:03:52 UTC

[flink] branch master updated: [FLINK-12235][hive] Support partition related operations in HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e1686  [FLINK-12235][hive] Support partition related operations in HiveCatalog
95e1686 is described below

commit 95e16862822922972f70a710bc09b7ec31c9043b
Author: Rui Li <li...@apache.org>
AuthorDate: Wed May 15 19:44:06 2019 +0800

    [FLINK-12235][hive] Support partition related operations in HiveCatalog
    
    This PR adds support for Hive partitions in HiveCatalog.
    
    This closes #8449.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 270 ++++++++++++++-
 .../table/catalog/hive/HiveCatalogPartition.java   |  61 ++++
 .../hive/HiveCatalogGenericMetadataTest.java       | 108 ++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |  18 +
 .../table/catalog/GenericInMemoryCatalog.java      |  82 ++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 371 +-------------------
 .../org/apache/flink/table/catalog/Catalog.java    |   2 +-
 .../flink/table/catalog/CatalogTestBase.java       | 379 +++++++++++++++++++++
 .../flink/table/catalog/CatalogTestUtil.java       |   4 -
 9 files changed, 859 insertions(+), 436 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1f0bddc..d2387f0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
+import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -500,7 +503,7 @@ public class HiveCatalog implements Catalog {
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
 		if (!hiveTable.getPartitionKeys().isEmpty()) {
-			partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList());
+			partitionKeys = getFieldNames(hiveTable.getPartitionKeys());
 		}
 
 		if (isView) {
@@ -608,44 +611,285 @@ public class HiveCatalog implements Catalog {
 	// ------ partitions ------
 
 	@Override
+	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws CatalogException {
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+		try {
+			return getHivePartition(tablePath, partitionSpec) != null;
+		} catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
+		}
+	}
+
+	@Override
 	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
 			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+		checkNotNull(partition, "Partition cannot be null");
+
+		if (!(partition instanceof HiveCatalogPartition)) {
+			throw new CatalogException("Currently only supports HiveCatalogPartition");
+		}
+
+		Table hiveTable = getHiveTable(tablePath);
+
+		ensureTableAndPartitionMatch(hiveTable, partition);
+
+		ensurePartitionedTable(tablePath, hiveTable);
+
+		try {
+			client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to create partition %s of table %s", partitionSpec, tablePath));
+		}
 	}
 
 	@Override
 	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
 			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
 
-	@Override
-	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		try {
+			Table hiveTable = getHiveTable(tablePath);
+			client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(),
+				getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+			}
+		} catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath));
+		}
 	}
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
 			throws TableNotExistException, TableNotPartitionedException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(tablePath, "Table path cannot be null");
+
+		Table hiveTable = getHiveTable(tablePath);
+
+		ensurePartitionedTable(tablePath, hiveTable);
+
+		try {
+			// pass -1 as max_parts to fetch all partitions
+			return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1).stream()
+				.map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list partitions of table %s", tablePath), e);
+		}
 	}
 
 	@Override
 	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
 			throws TableNotExistException, TableNotPartitionedException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+		Table hiveTable = getHiveTable(tablePath);
+
+		ensurePartitionedTable(tablePath, hiveTable);
+
+		try {
+			// partition spec can be partial
+			List<String> partialVals = MetaStoreUtils.getPvals(hiveTable.getPartitionKeys(), partitionSpec.getPartitionSpec());
+			return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), partialVals,
+				(short) -1).stream().map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list partitions of table %s", tablePath), e);
+		}
 	}
 
 	@Override
 	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
 			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+		try {
+			Partition hivePartition = getHivePartition(tablePath, partitionSpec);
+			return instantiateCatalogPartition(hivePartition);
+		} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
+		}
 	}
 
 	@Override
-	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
-		throw new UnsupportedOperationException();
+	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+			throws PartitionNotExistException, CatalogException {
+		checkNotNull(tablePath, "Table path cannot be null");
+		checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+		checkNotNull(newPartition, "New partition cannot be null");
+
+		if (!(newPartition instanceof HiveCatalogPartition)) {
+			throw new CatalogException("Currently only supports HiveCatalogPartition");
+		}
+
+		// Explicitly check if the partition exists or not
+		// because alter_partition() doesn't throw NoSuchObjectException like dropPartition() when the target doesn't exist
+		try {
+			Table hiveTable = getHiveTable(tablePath);
+			ensureTableAndPartitionMatch(hiveTable, newPartition);
+			Partition oldHivePartition = getHivePartition(hiveTable, partitionSpec);
+			if (oldHivePartition == null) {
+				if (ignoreIfNotExists) {
+					return;
+				}
+				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			}
+			Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
+			if (newHivePartition.getSd().getLocation() == null) {
+				newHivePartition.getSd().setLocation(oldHivePartition.getSd().getLocation());
+			}
+			client.alter_partition(
+				tablePath.getDatabaseName(),
+				tablePath.getObjectName(),
+				newHivePartition
+			);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+			}
+		} catch (InvalidOperationException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
+			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to alter existing partition with new partition %s of table %s",
+					partitionSpec, tablePath), e);
+		}
+	}
+
+	// make sure both table and partition are generic, or neither is
+	private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
+		boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+		if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
+			(!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
+			throw new CatalogException(String.format("Cannot handle %s partition for %s table",
+				catalogPartition.getClass().getName(), isGeneric ? "generic" : "non-generic"));
+		}
+	}
+
+	private Partition instantiateHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+			throws PartitionSpecInvalidException {
+		Partition partition = new Partition();
+		List<String> partCols = getFieldNames(hiveTable.getPartitionKeys());
+		List<String> partValues = getOrderedFullPartitionValues(
+			partitionSpec, partCols, new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()));
+		// validate partition values
+		for (int i = 0; i < partCols.size(); i++) {
+			if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
+				throw new PartitionSpecInvalidException(catalogName, partCols,
+					new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec);
+			}
+		}
+		// TODO: handle GenericCatalogPartition
+		HiveCatalogPartition hiveCatalogPartition = (HiveCatalogPartition) catalogPartition;
+		partition.setValues(partValues);
+		partition.setDbName(hiveTable.getDbName());
+		partition.setTableName(hiveTable.getTableName());
+		partition.setCreateTime((int) (System.currentTimeMillis() / 1000));
+		partition.setParameters(hiveCatalogPartition.getProperties());
+		partition.setSd(hiveTable.getSd().deepCopy());
+		partition.getSd().setLocation(hiveCatalogPartition.getLocation());
+
+		return partition;
+	}
+
+	private static CatalogPartition instantiateCatalogPartition(Partition hivePartition) {
+		// TODO: create GenericCatalogPartition for GenericCatalogTable
+		return new HiveCatalogPartition(hivePartition.getParameters(), hivePartition.getSd().getLocation());
+	}
+
+	private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
+		if (hiveTable.getPartitionKeysSize() == 0) {
+			throw new TableNotPartitionedException(catalogName, tablePath);
+		}
+	}
+
+	/**
+	 * Get field names from field schemas.
+	 */
+	private static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
+		List<String> names = new ArrayList<>(fieldSchemas.size());
+		for (FieldSchema fs : fieldSchemas) {
+			names.add(fs.getName());
+		}
+		return names;
+	}
+
+	/**
+	 * Creates a {@link CatalogPartitionSpec} from a Hive partition name string.
+	 * Example of Hive partition name string - "name=bob/year=2019"
+	 */
+	private static CatalogPartitionSpec createPartitionSpec(String hivePartitionName) {
+		String[] partKeyVals = hivePartitionName.split("/");
+		Map<String, String> spec = new HashMap<>(partKeyVals.length);
+		for (String keyVal : partKeyVals) {
+			String[] kv = keyVal.split("=");
+			spec.put(kv[0], kv[1]);
+		}
+		return new CatalogPartitionSpec(spec);
+	}
+
+	/**
+	 * Get a list of ordered partition values by re-arranging them based on the given list of partition keys.
+	 *
+	 * @param partitionSpec a partition spec.
+	 * @param partitionKeys a list of partition keys.
+	 * @param tablePath path of the table to which the partition belongs.
+	 * @return A list of partition values ordered according to partitionKeys.
+	 * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have different sizes,
+	 *                                       or any key in partitionKeys doesn't exist in partitionSpec.
+	 */
+	private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+			throws PartitionSpecInvalidException {
+		Map<String, String> spec = partitionSpec.getPartitionSpec();
+		if (spec.size() != partitionKeys.size()) {
+			throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+		}
+
+		List<String> values = new ArrayList<>(spec.size());
+		for (String key : partitionKeys) {
+			if (!spec.containsKey(key)) {
+				throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+			} else {
+				values.add(spec.get(key));
+			}
+		}
+
+		return values;
+	}
+
+	private Partition getHivePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws TableNotExistException, PartitionSpecInvalidException, TException {
+		return getHivePartition(getHiveTable(tablePath), partitionSpec);
+	}
+
+	private Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
+			throws PartitionSpecInvalidException, TException {
+		return client.getPartition(hiveTable.getDbName(), hiveTable.getTableName(),
+			getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()),
+				new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName())));
 	}
 
 	// ------ functions ------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
new file mode 100644
index 0000000..98b13a2
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.AbstractCatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A CatalogPartition implementation that represents a Partition in Hive.
+ */
+public class HiveCatalogPartition extends AbstractCatalogPartition {
+	private final String location;
+
+	public HiveCatalogPartition(Map<String, String> properties, String location) {
+		super(properties, null);
+		this.location = location;
+	}
+
+	public HiveCatalogPartition(Map<String, String> properties) {
+		this(properties, null);
+	}
+
+	public String getLocation() {
+		return location;
+	}
+
+	@Override
+	public CatalogPartition copy() {
+		return new HiveCatalogPartition(new HashMap<>(getProperties()), location);
+	}
+
+	@Override
+	public Optional<String> getDescription() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<String> getDetailedDescription() {
+		return Optional.empty();
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 9a35068..90ec11e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
@@ -92,6 +93,108 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
 		checkEquals(table, (CatalogTable) catalog.getTable(path1));
 	}
 
+	// ------ partitions ------
+
+	@Test
+	public void testCreatePartition() throws Exception {
+	}
+
+	@Test
+	public void testCreatePartition_TableNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testCreatePartition_TableNotPartitionedException() throws Exception {
+	}
+
+	@Test
+	public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
+	}
+
+	@Test
+	public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
+	}
+
+	@Test
+	public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition_TableNotExist() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition_TableNotPartitioned() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition_PartitionSpecInvalid() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition_PartitionNotExist() throws Exception {
+	}
+
+	@Test
+	public void testDropPartition_PartitionNotExist_ignored() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition_TableNotExist() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition_TableNotPartitioned() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition_PartitionNotExist() throws Exception {
+	}
+
+	@Test
+	public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
+	}
+
+	@Test
+	public void testGetPartition_TableNotExist() throws Exception {
+	}
+
+	@Test
+	public void testGetPartition_TableNotPartitioned() throws Exception {
+	}
+
+	@Test
+	public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+	}
+
+	@Test
+	public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws Exception {
+	}
+
+	@Test
+	public void testGetPartition_PartitionNotExist() throws Exception {
+	}
+
+	@Test
+	public void testPartitionExists() throws Exception {
+	}
+
+	@Test
+	public void testListPartitionPartialSpec() throws Exception {
+	}
+
 	// ------ test utils ------
 
 	@Override
@@ -183,4 +286,9 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
 	protected CatalogFunction createAnotherFunction() {
 		return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
 	}
+
+	@Override
+	public CatalogPartition createPartition() {
+		throw new UnsupportedOperationException();
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 1126564..e2d95a3 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
@@ -144,6 +145,11 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 	}
 
 	@Override
+	public CatalogPartition createPartition() {
+		return new HiveCatalogPartition(getBatchTableProperties());
+	}
+
+	@Override
 	public void checkEquals(CatalogTable t1, CatalogTable t2) {
 		assertEquals(t1.getSchema(), t2.getSchema());
 		assertEquals(t1.getComment(), t2.getComment());
@@ -155,6 +161,7 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 		assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
 	}
 
+	@Override
 	protected void checkEquals(CatalogView v1, CatalogView v2) {
 		assertEquals(v1.getSchema(), v1.getSchema());
 		assertEquals(v1.getComment(), v2.getComment());
@@ -165,4 +172,15 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 		// thus properties of Hive view is a super set of those in its corresponding Flink view
 		assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
 	}
+
+	@Override
+	protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
+		assertTrue(expected instanceof HiveCatalogPartition && actual instanceof HiveCatalogPartition);
+		assertEquals(expected.getClass(), actual.getClass());
+		HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) expected;
+		HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) actual;
+		assertEquals(hivePartition1.getDescription(), hivePartition2.getDescription());
+		assertEquals(hivePartition1.getDetailedDescription(), hivePartition2.getDetailedDescription());
+		assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index c90e1722..d46a423 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -344,6 +344,12 @@ public class GenericInMemoryCatalog implements Catalog {
 		return databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
 	}
 
+	private void ensureTableExists(ObjectPath tablePath) throws TableNotExistException {
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(catalogName, tablePath);
+		}
+	}
+
 	// ------ functions ------
 
 	@Override
@@ -437,13 +443,9 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(partitionSpec);
 		checkNotNull(partition);
 
-		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
-		}
-
-		if (!isPartitionedTable(tablePath)) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
-		}
+		ensureTableExists(tablePath);
+		ensurePartitionedTable(tablePath);
+		ensureFullPartitionSpec(tablePath, partitionSpec);
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			if (!ignoreIfExists) {
@@ -451,11 +453,6 @@ public class GenericInMemoryCatalog implements Catalog {
 			}
 		}
 
-		if (!isPartitionSpecValid(tablePath, partitionSpec)) {
-			throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
-				tablePath, partitionSpec);
-		}
-
 		partitions.get(tablePath).put(partitionSpec, partition.copy());
 	}
 
@@ -502,13 +499,8 @@ public class GenericInMemoryCatalog implements Catalog {
 			throws TableNotExistException, TableNotPartitionedException, CatalogException {
 		checkNotNull(tablePath);
 
-		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
-		}
-
-		if (!isPartitionedTable(tablePath)) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
-		}
+		ensureTableExists(tablePath);
+		ensurePartitionedTable(tablePath);
 
 		return new ArrayList<>(partitions.get(tablePath).keySet());
 	}
@@ -519,15 +511,12 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(tablePath);
 		checkNotNull(partitionSpec);
 
-		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
-		}
-
-		if (!isPartitionedTable(tablePath)) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
-		}
+		ensurePartitionedTable(tablePath);
 
-		if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+		CatalogTable catalogTable = (CatalogTable) getTable(tablePath);
+		List<String> partKeys = catalogTable.getPartitionKeys();
+		Map<String, String> spec = partitionSpec.getPartitionSpec();
+		if (!partKeys.containsAll(spec.keySet())) {
 			return new ArrayList<>();
 		}
 
@@ -558,41 +547,36 @@ public class GenericInMemoryCatalog implements Catalog {
 		return partitions.containsKey(tablePath) && partitions.get(tablePath).containsKey(partitionSpec);
 	}
 
+	private void ensureFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+		throws TableNotExistException, PartitionSpecInvalidException {
+		if (!isFullPartitionSpec(tablePath, partitionSpec)) {
+			throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+				tablePath, partitionSpec);
+		}
+	}
+
 	/**
-	 * Check if the given partitionSpec is valid for the given table.
-	 * Note that partition spec is considered invalid if the table doesn't exist or isn't partitioned.
+	 * Check if the given partitionSpec is full partition spec for the given table.
 	 */
-	private boolean isPartitionSpecValid(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) {
-		CatalogBaseTable baseTable;
-		try {
-			baseTable = getTable(tablePath);
-		} catch (TableNotExistException e) {
-			return false;
-		}
+	private boolean isFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException {
+		CatalogBaseTable baseTable = getTable(tablePath);
 
 		if (!(baseTable instanceof CatalogTable)) {
 			return false;
 		}
 
-		CatalogTable table =  (CatalogTable) baseTable;
+		CatalogTable table = (CatalogTable) baseTable;
 		List<String> partitionKeys = table.getPartitionKeys();
 		Map<String, String> spec = partitionSpec.getPartitionSpec();
 
 		// The size of partition spec should not exceed the size of partition keys
-		if (partitionKeys.size() < spec.size()) {
-			return false;
-		} else {
-			int size = spec.size();
+		return partitionKeys.size() == spec.size() && spec.keySet().containsAll(partitionKeys);
+	}
 
-			// PartitionSpec should contain the first 'size' number of keys in partition key list
-			for (int i = 0; i < size; i++) {
-				if (!spec.containsKey(partitionKeys.get(i))) {
-					return false;
-				}
-			}
+	private void ensurePartitionedTable(ObjectPath tablePath) throws TableNotPartitionedException {
+		if (!isPartitionedTable(tablePath)) {
+			throw new TableNotPartitionedException(catalogName, tablePath);
 		}
-
-		return true;
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d332456..89eb043 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -19,11 +19,6 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -39,13 +34,10 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -100,194 +92,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	// ------ partitions ------
 
 	@Test
-	public void testCreatePartition() throws Exception {
-		CatalogTable table = createPartitionedTable();
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, table, false);
-
-		assertTrue(catalog.listPartitions(path1).isEmpty());
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		catalog.createPartition(path1, partitionSpec, createPartition(), false);
-
-		assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1));
-		assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1, createPartitionSpecSubset()));
-		CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
-
-		CatalogPartitionSpec anotherPartitionSpec = createAnotherPartitionSpec();
-		CatalogPartition anotherPartition = createAnotherPartition();
-		catalog.createPartition(path1, anotherPartitionSpec, anotherPartition, false);
-
-		assertEquals(Arrays.asList(partitionSpec, anotherPartitionSpec), catalog.listPartitions(path1));
-		assertEquals(Arrays.asList(partitionSpec, anotherPartitionSpec), catalog.listPartitions(path1, createPartitionSpecSubset()));
-		CatalogTestUtil.checkEquals(anotherPartition, catalog.getPartition(path1, anotherPartitionSpec));
-
-		CatalogPartitionSpec invalid = createInvalidPartitionSpecSubset();
-		assertTrue(catalog.listPartitions(path1, invalid).isEmpty());
-	}
-
-	@Test
-	public void testCreatePartition_TableNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage(
-			String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-	}
-
-	@Test
-	public void testCreatePartition_TableNotPartitionedException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createTable(), false);
-
-		exception.expect(TableNotPartitionedException.class);
-		exception.expectMessage(
-			String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-	}
-
-	@Test
-	public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionSpecInvalidException.class);
-		exception.expectMessage(
-			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
-				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.createPartition(path1, partitionSpec, createPartition(), false);
-	}
-
-	@Test
-	public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-		CatalogPartition partition = createPartition();
-		catalog.createPartition(path1, createPartitionSpec(), partition, false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-		exception.expect(PartitionAlreadyExistsException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s already exists.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.createPartition(path1, partitionSpec, createPartition(), false);
-	}
-
-	@Test
-	public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		catalog.createPartition(path1, partitionSpec, createPartition(), false);
-		catalog.createPartition(path1, partitionSpec, createPartition(), true);
-	}
-
-	@Test
-	public void testDropPartition() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-
-		assertEquals(Arrays.asList(createPartitionSpec()), catalog.listPartitions(path1));
-
-		catalog.dropPartition(path1, createPartitionSpec(), false);
-
-		assertEquals(Arrays.asList(), catalog.listPartitions(path1));
-	}
-
-	@Test
-	public void testDropPartition_PartitionNotExistException_TableNotExist() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, partitionSpec, false);
-	}
-
-	@Test
-	public void testDropPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createTable(), false);
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, partitionSpec, false);
-	}
-
-	@Test
-	public void testDropPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, partitionSpec, false);
-	}
-
-	@Test
-	public void testDropPartition_PartitionNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.dropPartition(path1, partitionSpec, false);
-	}
-
-	@Test
-	public void testDropPartition_PartitionNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-		catalog.dropPartition(path1, createPartitionSpec(), true);
-	}
-
-	@Test
-	public void testAlterPartition() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		catalog.createPartition(path1, partitionSpec, createPartition(), false);
-
-		assertEquals(Arrays.asList(partitionSpec), catalog.listPartitions(path1));
-
-		CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
-		CatalogTestUtil.checkEquals(createPartition(), cp);
-
-		assertNull(cp.getProperties().get("k"));
-
-		Map<String, String> partitionProperties = getBatchTableProperties();
-		partitionProperties.put("k", "v");
-
-		CatalogPartition another = createPartition(partitionProperties);
-		catalog.alterPartition(path1, createPartitionSpec(), another, false);
-
-		assertEquals(Arrays.asList(createPartitionSpec()), catalog.listPartitions(path1));
-
-		cp = catalog.getPartition(path1, createPartitionSpec());
-		CatalogTestUtil.checkEquals(another, cp);
-
-		assertEquals("v", cp.getProperties().get("k"));
-	}
-
-	@Test
 	public void testAlterPartition_differentTypedPartition() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		catalog.createTable(path1, createPartitionedTable(), false);
@@ -305,140 +109,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		catalog.alterPartition(path1, partitionSpec, new TestPartition(), false);
 	}
 
-	@Test
-	public void testAlterPartition_PartitionNotExistException_TableNotExist() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
-	}
-
-	@Test
-	public void testAlterPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
-	}
-
-	@Test
-	public void testAlterPartition_PartitionNotExistException_PartitionSpecInvalid() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
-	}
-
-	@Test
-	public void testAlterPartition_PartitionNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartition catalogPartition = createPartition();
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.alterPartition(path1, partitionSpec, catalogPartition, false);
-	}
-
-	@Test
-	public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-		catalog.alterPartition(path1, createPartitionSpec(), createPartition(), true);
-	}
-
-	@Test
-	public void testGetPartition_PartitionNotExistException_TableNotExist() throws Exception {
-		exception.expect(PartitionNotExistException.class);
-		catalog.getPartition(path1, createPartitionSpec());
-	}
-
-	@Test
-	public void testGetPartition_PartitionNotExistException_TableNotPartitioned() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createTable(), false);
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
-				path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.getPartition(path1, partitionSpec);
-	}
-
-	@Test
-	public void testGetPartition_PartitionSpecInvalidException_invalidPartitionSpec() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.getPartition(path1, partitionSpec);
-	}
-
-	@Test
-	public void testGetPartition_PartitionNotExistException_PartitionSpecInvalid_sizeNotEqual() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
-			new HashMap<String, String>() {{
-				put("second", "bob");
-			}}
-		);
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.getPartition(path1, partitionSpec);
-	}
-
-	@Test
-	public void testGetPartition_PartitionNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-
-		CatalogPartitionSpec partitionSpec = createPartitionSpec();
-		exception.expect(PartitionNotExistException.class);
-		exception.expectMessage(
-			String.format("Partition %s of table %s in catalog %s does not exist.",
-				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-		catalog.getPartition(path1, partitionSpec);
-	}
-
-	@Test
-	public void testPartitionExists() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createPartitionedTable(), false);
-		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
-
-		assertTrue(catalog.partitionExists(path1, createPartitionSpec()));
-		assertFalse(catalog.partitionExists(path2, createPartitionSpec()));
-		assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
-	}
-
 	// ------ statistics ------
 
 	@Test
@@ -542,48 +212,11 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			TEST_COMMENT);
 	}
 
-	private CatalogPartitionSpec createPartitionSpec() {
-		return new CatalogPartitionSpec(
-			new HashMap<String, String>() {{
-				put("third", "2000");
-				put("second", "bob");
-			}});
-	}
-
-	private CatalogPartitionSpec createAnotherPartitionSpec() {
-		return new CatalogPartitionSpec(
-			new HashMap<String, String>() {{
-				put("third", "2010");
-				put("second", "bob");
-			}});
-	}
-
-	private CatalogPartitionSpec createPartitionSpecSubset() {
-		return new CatalogPartitionSpec(
-			new HashMap<String, String>() {{
-				put("second", "bob");
-			}});
-	}
-
-	private CatalogPartitionSpec createInvalidPartitionSpecSubset() {
-		return new CatalogPartitionSpec(
-			new HashMap<String, String>() {{
-				put("third", "2010");
-			}});
-	}
-
-	private CatalogPartition createPartition() {
-		return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
-	}
-
-	private CatalogPartition createAnotherPartition() {
+	@Override
+	public CatalogPartition createPartition() {
 		return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
 	}
 
-	private CatalogPartition createPartition(Map<String, String> props) {
-		return new GenericCatalogPartition(props, "Generic catalog table");
-	}
-
 	@Override
 	public CatalogView createView() {
 		return new GenericCatalogView(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index cd5be1a..20c4b7e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -273,7 +273,7 @@ public interface Catalog {
 	 * @param partitionSpec partition spec of partition to get
 	 * @return the requested partition
 	 *
-	 * @throws PartitionNotExistException thrown if the partition is not partitioned
+	 * @throws PartitionNotExistException thrown if the partition doesn't exist
 	 * @throws CatalogException	in case of any runtime exception
 	 */
 	CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5bfb98a..88d195e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -27,8 +27,12 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.functions.ScalarFunction;
 
 import org.junit.After;
@@ -38,6 +42,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -47,6 +52,7 @@ import java.util.Optional;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -753,6 +759,330 @@ public abstract class CatalogTestBase {
 		catalog.dropDatabase(db1, false);
 	}
 
+	// ------ partitions ------
+
+	@Test
+	public void testCreatePartition() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+
+		assertTrue(catalog.listPartitions(path1).isEmpty());
+
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
+		checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
+
+		catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
+
+		assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1));
+		assertEquals(Arrays.asList(createPartitionSpec(), createAnotherPartitionSpec()), catalog.listPartitions(path1, createPartitionSpecSubset()));
+		checkEquals(createPartition(), catalog.getPartition(path1, createAnotherPartitionSpec()));
+	}
+
+	@Test
+	public void testCreatePartition_TableNotExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage(
+			String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+	}
+
+	@Test
+	public void testCreatePartition_TableNotPartitionedException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		exception.expect(TableNotPartitionedException.class);
+		exception.expectMessage(
+			String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+	}
+
+	@Test
+	public void testCreatePartition_PartitionSpecInvalidException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+		exception.expect(PartitionSpecInvalidException.class);
+		exception.expectMessage(
+			String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.",
+				partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.createPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testCreatePartition_PartitionAlreadyExistsException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		CatalogPartition partition = createPartition();
+		catalog.createPartition(path1, createPartitionSpec(), partition, false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+
+		exception.expect(PartitionAlreadyExistsException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s already exists.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.createPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testCreatePartition_PartitionAlreadyExists_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		catalog.createPartition(path1, partitionSpec, createPartition(), false);
+		catalog.createPartition(path1, partitionSpec, createPartition(), true);
+	}
+
+	@Test
+	public void testDropPartition() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
+		catalog.dropPartition(path1, createPartitionSpec(), false);
+
+		assertEquals(Collections.emptyList(), catalog.listPartitions(path1));
+	}
+
+	@Test
+	public void testDropPartition_TableNotExist() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
+	}
+
+	@Test
+	public void testDropPartition_TableNotPartitioned() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
+	}
+
+	@Test
+	public void testDropPartition_PartitionSpecInvalid() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
+	}
+
+	@Test
+	public void testDropPartition_PartitionNotExist() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.dropPartition(path1, partitionSpec, false);
+	}
+
+	@Test
+	public void testDropPartition_PartitionNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.dropPartition(path1, createPartitionSpec(), true);
+	}
+
+	@Test
+	public void testAlterPartition() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+		CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
+		checkEquals(createPartition(), cp);
+		assertNull(cp.getProperties().get("k"));
+
+		CatalogPartition another = createPartition();
+		another.getProperties().put("k", "v");
+		catalog.alterPartition(path1, createPartitionSpec(), another, false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+		cp = catalog.getPartition(path1, createPartitionSpec());
+		checkEquals(another, cp);
+		assertEquals("v", cp.getProperties().get("k"));
+	}
+
+	@Test
+	public void testAlterPartition_TableNotExist() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testAlterPartition_TableNotPartitioned() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testAlterPartition_PartitionNotExist() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.alterPartition(path1, partitionSpec, createPartition(), false);
+	}
+
+	@Test
+	public void testAlterPartition_PartitionNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.alterPartition(path1, createPartitionSpec(), createPartition(), true);
+	}
+
+	@Test
+	public void testGetPartition_TableNotExist() throws Exception {
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+				path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
+	}
+
+	@Test
+	public void testGetPartition_TableNotPartitioned() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec,
+				path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
+	}
+
+	@Test
+	public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogPartitionSpec partitionSpec = createInvalidPartitionSpecSubset();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
+	}
+
+	@Test
+	public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("second", "bob");
+			}}
+		);
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
+	}
+
+	@Test
+	public void testGetPartition_PartitionNotExist() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+
+		CatalogPartitionSpec partitionSpec = createPartitionSpec();
+		exception.expect(PartitionNotExistException.class);
+		exception.expectMessage(
+			String.format("Partition %s of table %s in catalog %s does not exist.",
+				partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+		catalog.getPartition(path1, partitionSpec);
+	}
+
+	@Test
+	public void testPartitionExists() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+		assertTrue(catalog.partitionExists(path1, createPartitionSpec()));
+		assertFalse(catalog.partitionExists(path2, createPartitionSpec()));
+		assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), createPartitionSpec()));
+	}
+
+	@Test
+	public void testListPartitionPartialSpec() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+		catalog.createPartition(path1, createAnotherPartitionSpec(), createPartition(), false);
+
+		assertEquals(2, catalog.listPartitions(path1, createPartitionSpecSubset()).size());
+		assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size());
+	}
+
 	// ------ utilities ------
 
 	/**
@@ -832,6 +1162,13 @@ public abstract class CatalogTestBase {
 	 */
 	protected abstract CatalogFunction createAnotherFunction();
 
+	/**
+	 * Creates a CatalogPartition by specific catalog implementation.
+	 *
+	 * @return a CatalogPartition
+	 */
+	public abstract CatalogPartition createPartition();
+
 	protected TableSchema createTableSchema() {
 		return new TableSchema(
 			new String[] {"first", "second", "third"},
@@ -858,6 +1195,44 @@ public abstract class CatalogTestBase {
 		return Arrays.asList("second", "third");
 	}
 
+	protected CatalogPartitionSpec createPartitionSpec() {
+		return new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("third", "2000");
+				put("second", "bob");
+			}});
+	}
+
+	protected CatalogPartitionSpec createAnotherPartitionSpec() {
+		return new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("third", "2010");
+				put("second", "bob");
+			}});
+	}
+
+	protected CatalogPartitionSpec createPartitionSpecSubset() {
+		return new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("second", "bob");
+			}});
+	}
+
+	protected CatalogPartitionSpec createAnotherPartitionSpecSubset() {
+		return new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("third", "2000");
+			}}
+		);
+	}
+
+	protected CatalogPartitionSpec createInvalidPartitionSpecSubset() {
+		return new CatalogPartitionSpec(
+			new HashMap<String, String>() {{
+				put("third", "2010");
+			}});
+	}
+
 	protected Map<String, String> getBatchTableProperties() {
 		return new HashMap<String, String>() {{
 			put(IS_STREAMING, "false");
@@ -1032,4 +1407,8 @@ public abstract class CatalogTestBase {
 		assertEquals(f1.getClassName(), f2.getClassName());
 		assertEquals(f1.getProperties(), f2.getProperties());
 	}
+
+	protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
+		assertEquals(expected.getProperties(), actual.getProperties());
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 2b98091..2b7593e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -50,10 +50,6 @@ public class CatalogTestUtil {
 		assertEquals(d1.getProperties(), d2.getProperties());
 	}
 
-	public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
-		assertEquals(p1.getProperties(), p2.getProperties());
-	}
-
 	static void checkEquals(CatalogTableStatistics ts1, CatalogTableStatistics ts2) {
 		assertEquals(ts1.getRowCount(), ts2.getRowCount());
 		assertEquals(ts1.getFileCount(), ts2.getFileCount());