You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 17:15:32 UTC

[flink] branch master updated: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf20197  [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
cf20197 is described below

commit cf20197f1c8c51eab028da7c477dd6c7ad96db2f
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed May 1 14:26:35 2019 -0700

    [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
    
    This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using Hive metastore as a storage. Flink tables will be stored as Hive tables in metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive tables upon read and write.
    
    This closes #8329
---
 flink-connectors/flink-connector-hive/pom.xml      |   5 +-
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 114 +++++++-
 .../hive/GenericHiveMetastoreCatalogUtil.java      | 160 ++++++++++-
 .../table/catalog/hive/HiveCatalogBaseUtil.java    |  58 ++++
 .../flink/table/catalog/hive/HiveTableConfig.java  |  16 +-
 .../flink/table/catalog/hive/HiveTypeUtil.java     | 140 ++++++++++
 .../src/main/resources/META-INF/NOTICE             |   3 +
 .../hive/GenericHiveMetastoreCatalogTest.java      |  96 ++++++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |   4 +-
 .../flink/table/catalog/GenericCatalogTable.java   |  17 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  | 298 ++-------------------
 .../flink/table/catalog/CatalogBaseTable.java      |  14 +-
 .../flink/table/catalog/ReadableCatalog.java       |  12 +-
 .../table/catalog/ReadableWritableCatalog.java     |   7 +-
 .../exceptions/DatabaseNotEmptyException.java      |   2 +-
 .../flink/table/catalog/CatalogTestBase.java       | 297 +++++++++++++++++++-
 .../flink/table/catalog/CatalogTestUtil.java       |  10 +-
 17 files changed, 900 insertions(+), 353 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index cb09934..1b672d4 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -197,10 +197,6 @@ under the License.
 					<artifactId>metrics-json</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>com.fasterxml.jackson.core</groupId>
-					<artifactId>jackson-databind</artifactId>
-				</exclusion>
-				<exclusion>
 					<groupId>com.github.joshelser</groupId>
 					<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
 				</exclusion>
@@ -387,6 +383,7 @@ under the License.
 									<include>commons-dbcp:commons-dbcp</include>
 									<include>commons-pool:commons-pool</include>
 									<include>commons-beanutils:commons-beanutils</include>
+									<include>com.fasterxml.jackson.core:*</include>
 									<include>com.jolbox:bonecp</include>
 									<include>org.apache.hive:*</include>
 									<include>org.apache.thrift:libthrift</include>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index a8fcf62..50ed2e9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -229,31 +231,97 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 	@Override
 	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
 			throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		try {
+			client.dropTable(
+				tablePath.getDatabaseName(),
+				tablePath.getObjectName(),
+				// Indicate whether associated data should be deleted.
+				// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
+				true,
+				ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop table %s", tablePath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
-			throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+			throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		try {
+			// alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly
+			if (tableExists(tablePath)) {
+				ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+				// alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly
+				if (tableExists(newPath)) {
+					throw new TableAlreadyExistException(catalogName, newPath);
+				} else {
+					Table table = getHiveTable(tablePath);
+					table.setTableName(newTableName);
+					client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+				}
+			} else if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to rename table %s", tablePath.getFullName()), e);
+		}
 	}
 
 	@Override
 	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
 			throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		if (!databaseExists(tablePath.getDatabaseName())) {
+			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+		} else {
+			try {
+				client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table));
+			} catch (AlreadyExistsException e) {
+				if (!ignoreIfExists) {
+					throw new TableAlreadyExistException(catalogName, tablePath);
+				}
+			} catch (TException e) {
+				throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
+			}
+		}
 	}
 
 	@Override
-	public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+	public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
 			throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		if (!tableExists(tablePath)) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} else {
+			// IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case
+			// Thus we have to translate alterTable() into (dropTable() + createTable())
+			dropTable(tablePath, false);
+			try {
+				createTable(tablePath, newTable, false);
+			} catch (TableAlreadyExistException | DatabaseNotExistException e) {
+				// These exceptions wouldn't be thrown, unless a concurrent operation is triggered in Hive
+				throw new CatalogException(
+					String.format("Failed to alter table %s", tablePath), e);
+			}
+		}
 	}
 
 	@Override
-	public List<String> listTables(String databaseName)
-			throws DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		try {
+			return client.getAllTables(databaseName);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list tables in database %s", databaseName), e);
+		}
 	}
 
 	@Override
@@ -262,13 +330,33 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
 	}
 
 	@Override
-	public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		Table hiveTable = getHiveTable(tablePath);
+
+		return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
+	}
+
+	protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+		try {
+			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (NoSuchObjectException e) {
+			throw new TableNotExistException(catalogName, tablePath);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
+		}
 	}
 
 	@Override
-	public boolean tableExists(ObjectPath objectPath) throws CatalogException {
-		throw new UnsupportedOperationException();
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+		try {
+			return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (UnknownDBException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
+		}
 	}
 
 	// ------ partitions ------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
index 779905a..0e564f5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -18,32 +18,178 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+	// Prefix used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+	private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+	// Flink tables should be stored as 'external' tables in Hive metastore
+	private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new HashMap<String, String>() {{
+		put("EXTERNAL", "TRUE");
+	}};
+
 	private GenericHiveMetastoreCatalogUtil() {
 	}
 
 	// ------ Utils ------
 
 	/**
-	 * Creates a Hive database from CatalogDatabase.
+	 * Creates a Hive database from a CatalogDatabase.
+	 *
+	 * @param databaseName name of the database
+	 * @param catalogDatabase the CatalogDatabase instance
+	 * @return a Hive database
 	 */
-	public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
-		Map<String, String> props = db.getProperties();
+	public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
 		return new Database(
-			dbName,
-			db.getDescription().isPresent() ? db.getDescription().get() : null,
+			databaseName,
+			catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null,
 			null,
-			props);
+			catalogDatabase.getProperties());
+	}
+
+	/**
+	 * Creates a Hive table from a CatalogBaseTable.
+	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+	 *
+	 * @param tablePath path of the table
+	 * @param table the CatalogBaseTable instance
+	 * @return a Hive table
+	 */
+	public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
+		Map<String, String> properties = new HashMap<>(table.getProperties());
+
+		// Table comment
+		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
+
+		Table hiveTable = new Table();
+		hiveTable.setDbName(tablePath.getDatabaseName());
+		hiveTable.setTableName(tablePath.getObjectName());
+		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+		// Table properties
+		hiveTable.setParameters(buildFlinkProperties(properties));
+		hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+		// Hive table's StorageDescriptor
+		StorageDescriptor sd = new StorageDescriptor();
+		sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+		List<FieldSchema> allColumns = createHiveColumns(table.getSchema());
+
+		// Table columns and partition keys
+		if (table instanceof CatalogTable) {
+			CatalogTable catalogTable = (CatalogTable) table;
+
+			if (catalogTable.isPartitioned()) {
+				int partitionKeySize = catalogTable.getPartitionKeys().size();
+				List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
+				List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+				sd.setCols(regularColumns);
+				hiveTable.setPartitionKeys(partitionColumns);
+			} else {
+				sd.setCols(allColumns);
+				hiveTable.setPartitionKeys(new ArrayList<>());
+			}
+
+			hiveTable.setSd(sd);
+		} else {
+			// TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+			throw new UnsupportedOperationException();
+		}
+
+		return hiveTable;
+	}
+
+	/**
+	 * Creates a CatalogBaseTable from a Hive table.
+	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+	 *
+	 * @param hiveTable the Hive table
+	 * @return a CatalogBaseTable
+	 */
+	public static CatalogBaseTable createCatalogTable(Table hiveTable) {
+		// Table schema
+		TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
+				hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+
+		// Table properties
+		Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters());
+
+		// Table comment
+		String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+		// Partition keys
+		List<String> partitionKeys = new ArrayList<>();
+
+		if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) {
+			partitionKeys = hiveTable.getPartitionKeys().stream()
+								.map(fs -> fs.getName())
+								.collect(Collectors.toList());
+		}
+
+		return new GenericCatalogTable(
+			tableSchema, new TableStats(0), partitionKeys, properties, comment);
+	}
+
+	/**
+	 * Create Hive columns from Flink TableSchema.
+	 */
+	private static List<FieldSchema> createHiveColumns(TableSchema schema) {
+		String[] fieldNames = schema.getFieldNames();
+		TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+		List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+		for (int i = 0; i < fieldNames.length; i++) {
+			columns.add(
+				new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+		}
+
+		return columns;
+	}
+
+	/**
+	 * Filter out Hive-created properties, and return Flink-created properties.
+	 */
+	private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
+		return hiveTableParams.entrySet().stream()
+			.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+			.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+	}
+
+	/**
+	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+	 */
+	public static Map<String, String> buildFlinkProperties(Map<String, String> properties) {
+		return properties.entrySet().stream()
+			.filter(e -> e.getKey() != null && e.getValue() != null)
+			.collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
 	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
new file mode 100644
index 0000000..3f93b4a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared util for catalogs backed by Hive-metastore.
+ */
+public class HiveCatalogBaseUtil {
+
+	/**
+	 * Create a Flink's TableSchema from Hive table's columns and partition keys.
+	 *
+	 * @param cols columns of the Hive table
+	 * @param partitionKeys partition keys of the Hive table
+	 * @return a Flink TableSchema
+	 */
+	public static TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSchema> partitionKeys) {
+		List<FieldSchema> allCols = new ArrayList<>(cols);
+		allCols.addAll(partitionKeys);
+
+		String[] colNames = new String[allCols.size()];
+		TypeInformation[] colTypes = new TypeInformation[allCols.size()];
+
+		for (int i = 0; i < allCols.size(); i++) {
+			FieldSchema fs = allCols.get(i);
+
+			colNames[i] = fs.getName();
+			colTypes[i] = HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+		}
+
+		return new TableSchema(colNames, colTypes);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
similarity index 61%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 91cf133..336d16f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -16,20 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.exceptions;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Exception for trying to drop on a database that is not empty.
- *
+ * Configs for Flink tables stored in Hive metastore.
  */
-public class DatabaseNotEmptyException extends Exception {
-	private static final String MSG = "Database %s in Catalog %s is not empty.";
+public class HiveTableConfig {
 
-	public DatabaseNotEmptyException(String catalog, String database, Throwable cause) {
-		super(String.format(MSG, database, catalog), cause);
-	}
+	// Description of the Flink table
+	public static final String TABLE_COMMENT = "comment";
 
-	public DatabaseNotEmptyException(String catalog, String database) {
-		this(catalog, database, null);
-	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
new file mode 100644
index 0000000..c665d6c
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Utils to convert data types between Flink and Hive.
+ */
+public class HiveTypeUtil {
+
+	// Note: Need to keep this in sync with BaseSemanticAnalyzer::getTypeStringFromAST
+	private static final String HIVE_ARRAY_TYPE_NAME_FORMAT = serdeConstants.LIST_TYPE_NAME + "<%s>";
+
+	private HiveTypeUtil() {
+	}
+
+	/**
+	 * Convert Flink data type to Hive data type.
+	 * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+	 * 		[FLINK-12386] Support complete mapping between Flink and Hive data types
+	 *
+	 * @param type a Flink data type
+	 * @return the corresponding Hive data type
+	 */
+	public static String toHiveType(TypeInformation type) {
+		if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+			return serdeConstants.BOOLEAN_TYPE_NAME;
+		} else if (type == BasicTypeInfo.BYTE_TYPE_INFO) {
+			return serdeConstants.TINYINT_TYPE_NAME;
+		} else if (type == BasicTypeInfo.SHORT_TYPE_INFO) {
+			return serdeConstants.SMALLINT_TYPE_NAME;
+		} else if (type == BasicTypeInfo.INT_TYPE_INFO) {
+			return serdeConstants.INT_TYPE_NAME;
+		} else if (type == BasicTypeInfo.LONG_TYPE_INFO) {
+			return serdeConstants.BIGINT_TYPE_NAME;
+		} else if (type == BasicTypeInfo.FLOAT_TYPE_INFO) {
+			return serdeConstants.FLOAT_TYPE_NAME;
+		} else if (type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+			return serdeConstants.DOUBLE_TYPE_NAME;
+		} else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+			return serdeConstants.STRING_TYPE_NAME;
+		} else if (type == BasicTypeInfo.DATE_TYPE_INFO) {
+			return serdeConstants.DATE_TYPE_NAME;
+		} else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) {
+			return serdeConstants.BINARY_TYPE_NAME;
+		} else if (type instanceof SqlTimeTypeInfo) {
+			return serdeConstants.TIMESTAMP_TYPE_NAME;
+		} else if (type instanceof BasicArrayTypeInfo) {
+			return toHiveArrayType((BasicArrayTypeInfo) type);
+		} else {
+			throw new UnsupportedOperationException(
+				String.format("Flink doesn't support converting type %s to Hive type yet.", type.toString()));
+		}
+	}
+
+	private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) {
+		return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, toHiveType(arrayTypeInfo.getComponentInfo()));
+	}
+
+	/**
+	 * Convert Hive data type to a Flink data type.
+	 * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+	 *      [FLINK-12386] Support complete mapping between Flink and Hive data types
+	 *
+	 * @param hiveType a Hive data type
+	 * @return the corresponding Flink data type
+	 */
+	public static TypeInformation toFlinkType(TypeInfo hiveType) {
+		switch (hiveType.getCategory()) {
+			case PRIMITIVE:
+				return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
+			case LIST:
+				ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
+				return BasicArrayTypeInfo.getInfoFor(toFlinkType(listTypeInfo.getListElementTypeInfo()).getTypeClass());
+			default:
+				throw new UnsupportedOperationException(
+					String.format("Flink doesn't support Hive data type %s yet.", hiveType));
+		}
+	}
+
+	// TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+	//    [FLINK-12386] Support complete mapping between Flink and Hive data types
+	private static TypeInformation toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
+		switch (hiveType.getPrimitiveCategory()) {
+			// For CHAR(p) and VARCHAR(p) types, map them to String for now because Flink doesn't yet support them.
+			case CHAR:
+			case VARCHAR:
+			case STRING:
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			case BOOLEAN:
+				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+			case BYTE:
+				return BasicTypeInfo.BYTE_TYPE_INFO;
+			case SHORT:
+				return BasicTypeInfo.SHORT_TYPE_INFO;
+			case INT:
+				return BasicTypeInfo.INT_TYPE_INFO;
+			case LONG:
+				return BasicTypeInfo.LONG_TYPE_INFO;
+			case FLOAT:
+				return BasicTypeInfo.FLOAT_TYPE_INFO;
+			case DOUBLE:
+				return BasicTypeInfo.DOUBLE_TYPE_INFO;
+			case DATE:
+				return BasicTypeInfo.DATE_TYPE_INFO;
+			case TIMESTAMP:
+				return SqlTimeTypeInfo.TIMESTAMP;
+			case BINARY:
+				return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+			default:
+				throw new UnsupportedOperationException(
+					String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
index ea1c160..b67affe 100644
--- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
@@ -8,6 +8,9 @@ This project bundles the following dependencies under the Apache Software Licens
 
 - commons-dbcp:commons-dbcp:1.4
 - commons-pool:commons-pool:1.5.4
+- com.fasterxml.jackson.core:jackson-annotations:2.6.0
+- com.fasterxml.jackson.core:jackson-core:2.6.5
+- com.fasterxml.jackson.core:jackson-databind:2.6.5
 - com.jolbox:bonecp:0.8.0.RELEASE
 - org.apache.hive:hive-common:2.3.4
 - org.apache.hive:hive-metastore:2.3.4
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 315d657..2687699 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -18,10 +18,18 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogTestUtil;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,14 +48,47 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 		catalog.open();
 	}
 
-	// =====================
-	// GenericHiveMetastoreCatalog doesn't support table operation yet
-	// Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
-	// =====================
+	// ------ data types ------
 
-	// TODO: re-enable this test once GenericHiveMetastoreCatalog support table operations
 	@Test
-	public void testDropDb_DatabaseNotEmptyException() throws Exception {
+	public void testDataTypes() throws Exception {
+		// TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+		//	  [FLINK-12386] Support complete mapping between Flink and Hive data types
+		TypeInformation[] types = new TypeInformation[] {
+			BasicTypeInfo.BYTE_TYPE_INFO,
+			BasicTypeInfo.SHORT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.FLOAT_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.BOOLEAN_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
+			BasicTypeInfo.DATE_TYPE_INFO,
+			SqlTimeTypeInfo.TIMESTAMP
+		};
+
+		verifyDataTypes(types);
+	}
+
+	private void verifyDataTypes(TypeInformation[] types) throws Exception {
+		String[] colNames = new String[types.length];
+
+		for (int i = 0; i < types.length; i++) {
+			colNames[i] = types[i].toString().toLowerCase() + "_col";
+		}
+
+		CatalogTable table = new GenericCatalogTable(
+			new TableSchema(colNames, types),
+			new TableStats(0),
+			getBatchTableProperties(),
+			TEST_COMMENT
+		);
+
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
 	}
 
 	// ------ utils ------
@@ -77,13 +118,48 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 
 	@Override
 	public CatalogTable createTable() {
-		// TODO: implement this once GenericHiveMetastoreCatalog support table operations
-		return null;
+		return new GenericCatalogTable(
+			createTableSchema(),
+			new TableStats(0),
+			getBatchTableProperties(),
+			TEST_COMMENT);
 	}
 
 	@Override
 	public CatalogTable createAnotherTable() {
-		// TODO: implement this once GenericHiveMetastoreCatalog support table operations
-		return null;
+		return new GenericCatalogTable(
+			createAnotherTableSchema(),
+			new TableStats(0),
+			getBatchTableProperties(),
+			TEST_COMMENT);
+	}
+
+	@Override
+	public CatalogTable createStreamingTable() {
+		return new GenericCatalogTable(
+			createTableSchema(),
+			new TableStats(0),
+			getStreamingTableProperties(),
+			TEST_COMMENT);
+	}
+
+	@Override
+	public CatalogTable createPartitionedTable() {
+		return new GenericCatalogTable(
+			createTableSchema(),
+			new TableStats(0),
+			createPartitionKeys(),
+			getBatchTableProperties(),
+			TEST_COMMENT);
+	}
+
+	@Override
+	public CatalogTable createAnotherPartitionedTable() {
+		return new GenericCatalogTable(
+			createAnotherTableSchema(),
+			new TableStats(0),
+			createPartitionKeys(),
+			getBatchTableProperties(),
+			TEST_COMMENT);
 	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 83f5bed..4a32313 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.catalog.CatalogTestBase;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.junit.rules.TemporaryFolder;
 
@@ -35,7 +37,7 @@ public class HiveTestUtils {
 	 * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
 	 */
 	public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException {
-		return new GenericHiveMetastoreCatalog("test", getHiveConf());
+		return new GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
 	}
 
 	private static HiveConf getHiveConf() throws IOException {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 47498e7..73c2dbc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -61,8 +61,8 @@ public class GenericCatalogTable implements CatalogTable {
 			TableSchema tableSchema,
 			TableStats tableStats,
 			Map<String, String> properties,
-			String comment) {
-		this(tableSchema, tableStats, new ArrayList<>(), properties, comment);
+			String description) {
+		this(tableSchema, tableStats, new ArrayList<>(), properties, description);
 	}
 
 	@Override
@@ -91,6 +91,11 @@ public class GenericCatalogTable implements CatalogTable {
 	}
 
 	@Override
+	public String getComment() {
+		return comment;
+	}
+
+	@Override
 	public GenericCatalogTable copy() {
 		return new GenericCatalogTable(
 			this.tableSchema.copy(), this.tableStats.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
@@ -106,12 +111,4 @@ public class GenericCatalogTable implements CatalogTable {
 		return Optional.of("This is a catalog table in an im-memory catalog");
 	}
 
-	public String getComment() {
-		return this.comment;
-	}
-
-	public void setComment(String comment) {
-		this.comment = comment;
-	}
-
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f88db0d..40a0d9f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -40,7 +37,6 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -62,15 +58,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 
 	@After
 	public void close() throws Exception {
-		if (catalog.tableExists(path1)) {
-			catalog.dropTable(path1, true);
-		}
-		if (catalog.tableExists(path2)) {
-			catalog.dropTable(path2, true);
-		}
-		if (catalog.tableExists(path3)) {
-			catalog.dropTable(path3, true);
-		}
 		if (catalog.functionExists(path1)) {
 			catalog.dropFunction(path1, true);
 		}
@@ -79,108 +66,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	// ------ tables ------
 
 	@Test
-	public void testCreateTable_Streaming() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createStreamingTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testCreateTable_Batch() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		// Non-partitioned table
-		CatalogTable table = createTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogBaseTable tableCreated = catalog.getTable(path1);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
-		assertEquals(TABLE_COMMENT, tableCreated.getDescription().get());
-
-		List<String> tables = catalog.listTables(db1);
-
-		assertEquals(1, tables.size());
-		assertEquals(path1.getObjectName(), tables.get(0));
-
-		catalog.dropTable(path1, false);
-
-		// Partitioned table
-		table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
-		tables = catalog.listTables(db1);
-
-		assertEquals(1, tables.size());
-		assertEquals(path1.getObjectName(), tables.get(0));
-	}
-
-	@Test
-	public void testCreateTable_DatabaseNotExistException() throws Exception {
-		assertFalse(catalog.databaseExists(db1));
-
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database db1 does not exist in Catalog");
-		catalog.createTable(nonExistObjectPath, createTable(), false);
-	}
-
-	@Test
-	public void testCreateTable_TableAlreadyExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1,  createTable(), false);
-
-		exception.expect(TableAlreadyExistException.class);
-		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
-		catalog.createTable(path1, createTable(), false);
-	}
-
-	@Test
-	public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogTable table = createTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
-		catalog.createTable(path1, createAnotherTable(), true);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testGetTable_TableNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
-		catalog.getTable(nonExistObjectPath);
-	}
-
-	@Test
-	public void testGetTable_TableNotExistException_NoDb() throws Exception {
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
-		catalog.getTable(nonExistObjectPath);
-	}
-
-	@Test
-	public void testDropTable_nonPartitionedTable() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createTable(), false);
-
-		assertTrue(catalog.tableExists(path1));
-
-		catalog.dropTable(path1, false);
-
-		assertFalse(catalog.tableExists(path1));
-	}
-
-	@Test
 	public void testDropTable_partitionedTable() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		catalog.createTable(path1, createPartitionedTable(), false);
@@ -214,78 +99,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testDropTable_TableNotExistException() throws Exception {
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
-		catalog.dropTable(nonExistDbPath, false);
-	}
-
-	@Test
-	public void testDropTable_TableNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.dropTable(nonExistObjectPath, true);
-	}
-
-	@Test
-	public void testAlterTable() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		// Non-partitioned table
-		CatalogTable table = createTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
-		CatalogTable newTable = createAnotherTable();
-		catalog.alterTable(path1, newTable, false);
-
-		assertNotEquals(table, catalog.getTable(path1));
-		CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
-
-		catalog.dropTable(path1, false);
-
-		// Partitioned table
-		table = createPartitionedTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
-		newTable = createAnotherPartitionedTable();
-		catalog.alterTable(path1, newTable, false);
-
-		CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testAlterTable_TableNotExistException() throws Exception {
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
-		catalog.alterTable(nonExistDbPath, createTable(), false);
-	}
-
-	@Test
-	public void testAlterTable_TableNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.alterTable(nonExistObjectPath, createTable(), true);
-
-		assertFalse(catalog.tableExists(nonExistObjectPath));
-	}
-
-	@Test
-	public void testRenameTable_nonPartitionedTable() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createTable();
-		catalog.createTable(path1, table, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
-		catalog.renameTable(path1, t2, false);
-
-		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3));
-		assertFalse(catalog.tableExists(path1));
-	}
-
-	@Test
 	public void testRenameTable_partitionedTable() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
@@ -305,44 +118,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
 	}
 
-	@Test
-	public void testRenameTable_TableNotExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog");
-		catalog.renameTable(path1, t2, false);
-	}
-
-	@Test
-	public void testRenameTable_TableNotExistException_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.renameTable(path1, t2, true);
-	}
-
-	@Test
-	public void testRenameTable_TableAlreadyExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		CatalogTable table = createTable();
-		catalog.createTable(path1, table, false);
-		catalog.createTable(path3, createAnotherTable(), false);
-
-		exception.expect(TableAlreadyExistException.class);
-		exception.expectMessage("Table (or view) db1.t2 already exists in Catalog");
-		catalog.renameTable(path1, t2, false);
-	}
-
-	@Test
-	public void testTableExists() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertFalse(catalog.tableExists(path1));
-
-		catalog.createTable(path1, createTable(), false);
-
-		assertTrue(catalog.tableExists(path1));
-	}
-
 	// ------ views ------
 
 	@Test
@@ -951,24 +726,29 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 
 	@Override
 	public CatalogDatabase createDb() {
-		return new GenericCatalogDatabase(new HashMap<String, String>() {{
-			put("k1", "v1");
-		}}, TEST_COMMENT);
+		return new GenericCatalogDatabase(
+			new HashMap<String, String>() {{
+				put("k1", "v1");
+			}},
+			TEST_COMMENT);
 	}
 
 	@Override
 	public CatalogDatabase createAnotherDb() {
-		return new GenericCatalogDatabase(new HashMap<String, String>() {{
-			put("k2", "v2");
-		}}, "this is another database.");
+		return new GenericCatalogDatabase(
+			new HashMap<String, String>() {{
+				put("k2", "v2");
+			}},
+			"this is another database.");
 	}
 
-	private GenericCatalogTable createStreamingTable() {
+	@Override
+	public GenericCatalogTable createStreamingTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
 			new TableStats(0),
 			getStreamingTableProperties(),
-			TABLE_COMMENT);
+			TEST_COMMENT);
 	}
 
 	@Override
@@ -977,7 +757,7 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			createTableSchema(),
 			new TableStats(0),
 			getBatchTableProperties(),
-			TABLE_COMMENT);
+			TEST_COMMENT);
 	}
 
 	@Override
@@ -986,29 +766,27 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			createAnotherTableSchema(),
 			new TableStats(0),
 			getBatchTableProperties(),
-			TABLE_COMMENT);
+			TEST_COMMENT);
 	}
 
-	protected CatalogTable createPartitionedTable() {
+	@Override
+	public CatalogTable createPartitionedTable() {
 		return new GenericCatalogTable(
 			createTableSchema(),
 			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
-			TABLE_COMMENT);
+			TEST_COMMENT);
 	}
 
-	protected CatalogTable createAnotherPartitionedTable() {
+	@Override
+	public CatalogTable createAnotherPartitionedTable() {
 		return new GenericCatalogTable(
 			createAnotherTableSchema(),
 			new TableStats(0),
 			createPartitionKeys(),
 			getBatchTableProperties(),
-			TABLE_COMMENT);
-	}
-
-	private List<String> createPartitionKeys() {
-		return Arrays.asList("second", "third");
+			TEST_COMMENT);
 	}
 
 	private CatalogPartitionSpec createPartitionSpec() {
@@ -1053,40 +831,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		return new GenericCatalogPartition(props);
 	}
 
-	private Map<String, String> getBatchTableProperties() {
-		return new HashMap<String, String>() {{
-			put(IS_STREAMING, "false");
-		}};
-	}
-
-	private Map<String, String> getStreamingTableProperties() {
-		return new HashMap<String, String>() {{
-			put(IS_STREAMING, "true");
-		}};
-	}
-
-	private TableSchema createTableSchema() {
-		return new TableSchema(
-			new String[] {"first", "second", "third"},
-			new TypeInformation[] {
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-			}
-		);
-	}
-
-	private TableSchema createAnotherTableSchema() {
-		return new TableSchema(
-			new String[] {"first2", "second", "third"},
-			new TypeInformation[] {
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO
-			}
-		);
-	}
-
 	private CatalogView createView() {
 		return new GenericCatalogView(
 			String.format("select * from %s", t1),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
index f916354..e2157e0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
@@ -30,18 +30,28 @@ import java.util.Optional;
 public interface CatalogBaseTable {
 	/**
 	 * Get the properties of the table.
-	 * @return table property map
+	 *
+	 * @return property map of the table/view
 	 */
 	Map<String, String> getProperties();
 
 	/**
 	 * Get the schema of the table.
-	 * @return schema of the table
+	 *
+	 * @return schema of the table/view.
 	 */
 	TableSchema getSchema();
 
 	/**
+	 * Get comment of the table or view.
+	 *
+	 * @return comment of the table/view.
+	 */
+	String getComment();
+
+	/**
 	 * Get a deep copy of the CatalogBaseTable instance.
+	 *
 	 * @return a copy of the CatalogBaseTable instance
 	 */
 	CatalogBaseTable copy();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index a8a69f2..5586348 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -96,6 +96,8 @@ public interface ReadableCatalog {
 	 */
 	boolean databaseExists(String databaseName) throws CatalogException;
 
+	// ------ tables and views ------
+
 	/**
 	 * Get names of all tables and views under this database. An empty list is returned if none exists.
 	 *
@@ -116,24 +118,24 @@ public interface ReadableCatalog {
 	List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException;
 
 	/**
-	 * Get a CatalogTable or CatalogView identified by objectPath.
+	 * Get a CatalogTable or CatalogView identified by tablePath.
 	 *
-	 * @param objectPath		Path of the table or view
+	 * @param tablePath		Path of the table or view
 	 * @return The requested table or view
 	 * @throws TableNotExistException if the target does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException;
+	CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
 
 	/**
 	 * Check if a table or view exists in this catalog.
 	 *
-	 * @param objectPath    Path of the table or view
+	 * @param tablePath    Path of the table or view
 	 * @return true if the given table exists in the catalog
 	 *         false otherwise
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	boolean tableExists(ObjectPath objectPath) throws CatalogException;
+	boolean tableExists(ObjectPath tablePath) throws CatalogException;
 
 	// ------ partitions ------
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 04d9bcb..60bc93d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -102,11 +102,10 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 *                          if set to false, throw an exception,
 	 *                          if set to true, do nothing.
 	 * @throws TableNotExistException if the table does not exist
-	 * @throws DatabaseNotExistException if the database in tablePath to doesn't exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
 	void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
-		throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException;
+		throws TableNotExistException, TableAlreadyExistException, CatalogException;
 
 	/**
 	 * Create a new table or view.
@@ -128,7 +127,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 * Note that the new and old CatalogBaseTable must be of the same type. For example, this doesn't
 	 * allow alter a regular table to partitioned table, or alter a view to a table, and vice versa.
 	 *
-	 * @param tableName path of the table or view to be modified
+	 * @param tablePath path of the table or view to be modified
 	 * @param newTable the new table definition
 	 * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist:
 	 *                          if set to false, throw an exception,
@@ -136,7 +135,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
 	 * @throws TableNotExistException if the table does not exist
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+	void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
 		throws TableNotExistException, CatalogException;
 
 	// ------ partitions ------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
index 91cf133..423d141 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
@@ -23,7 +23,7 @@ package org.apache.flink.table.catalog.exceptions;
  *
  */
 public class DatabaseNotEmptyException extends Exception {
-	private static final String MSG = "Database %s in Catalog %s is not empty.";
+	private static final String MSG = "Database %s in catalog %s is not empty.";
 
 	public DatabaseNotEmptyException(String catalog, String database, Throwable cause) {
 		super(String.format(MSG, database, catalog), cause);
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5457d34..8117c82 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -29,11 +34,14 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -55,9 +63,9 @@ public abstract class CatalogTestBase {
 	protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
 	protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
 
-	protected static final String TEST_CATALOG_NAME = "test-catalog";
+	public static final String TEST_CATALOG_NAME = "test-catalog";
+
 	protected static final String TEST_COMMENT = "test comment";
-	protected static final String TABLE_COMMENT = "This is my batch table";
 
 	protected static ReadableWritableCatalog catalog;
 
@@ -66,6 +74,16 @@ public abstract class CatalogTestBase {
 
 	@After
 	public void cleanup() throws Exception {
+		if (catalog.tableExists(path1)) {
+			catalog.dropTable(path1, true);
+		}
+		if (catalog.tableExists(path2)) {
+			catalog.dropTable(path2, true);
+		}
+		if (catalog.tableExists(path3)) {
+			catalog.dropTable(path3, true);
+		}
+
 		if (catalog.databaseExists(db1)) {
 			catalog.dropDatabase(db1, true);
 		}
@@ -168,7 +186,7 @@ public abstract class CatalogTestBase {
 		catalog.createTable(path1, createTable(), false);
 
 		exception.expect(DatabaseNotEmptyException.class);
-		exception.expectMessage("Database db1 in Catalog test-catalog is not empty");
+		exception.expectMessage("Database db1 in catalog test-catalog is not empty");
 		catalog.dropDatabase(db1, true);
 	}
 
@@ -209,6 +227,220 @@ public abstract class CatalogTestBase {
 		assertTrue(catalog.databaseExists(db1));
 	}
 
+	// ------ tables ------
+
+	@Test
+	public void testCreateTable_Streaming() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createStreamingTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testCreateTable_Batch() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		// Non-partitioned table
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogBaseTable tableCreated = catalog.getTable(path1);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
+		assertEquals(TEST_COMMENT, tableCreated.getDescription().get());
+
+		List<String> tables = catalog.listTables(db1);
+
+		assertEquals(1, tables.size());
+		assertEquals(path1.getObjectName(), tables.get(0));
+
+		catalog.dropTable(path1, false);
+
+		// Partitioned table
+		table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+		tables = catalog.listTables(db1);
+
+		assertEquals(1, tables.size());
+		assertEquals(path1.getObjectName(), tables.get(0));
+	}
+
+	@Test
+	public void testCreateTable_DatabaseNotExistException() throws Exception {
+		assertFalse(catalog.databaseExists(db1));
+
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database db1 does not exist in Catalog");
+		catalog.createTable(nonExistObjectPath, createTable(), false);
+	}
+
+	@Test
+	public void testCreateTable_TableAlreadyExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1,  createTable(), false);
+
+		exception.expect(TableAlreadyExistException.class);
+		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+		catalog.createTable(path1, createTable(), false);
+	}
+
+	@Test
+	public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+		catalog.createTable(path1, createAnotherTable(), true);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testGetTable_TableNotExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+		catalog.getTable(nonExistObjectPath);
+	}
+
+	@Test
+	public void testGetTable_TableNotExistException_NoDb() throws Exception {
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+		catalog.getTable(nonExistObjectPath);
+	}
+
+	@Test
+	public void testDropTable_nonPartitionedTable() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		assertTrue(catalog.tableExists(path1));
+
+		catalog.dropTable(path1, false);
+
+		assertFalse(catalog.tableExists(path1));
+	}
+
+	@Test
+	public void testDropTable_TableNotExistException() throws Exception {
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+		catalog.dropTable(nonExistDbPath, false);
+	}
+
+	@Test
+	public void testDropTable_TableNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.dropTable(nonExistObjectPath, true);
+	}
+
+	@Test
+	public void testAlterTable() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		// Non-partitioned table
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+		CatalogTable newTable = createAnotherTable();
+		catalog.alterTable(path1, newTable, false);
+
+		assertNotEquals(table, catalog.getTable(path1));
+		CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
+
+		catalog.dropTable(path1, false);
+
+		// Partitioned table
+		table = createPartitionedTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+		newTable = createAnotherPartitionedTable();
+		catalog.alterTable(path1, newTable, false);
+
+		CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testAlterTable_TableNotExistException() throws Exception {
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+		catalog.alterTable(nonExistDbPath, createTable(), false);
+	}
+
+	@Test
+	public void testAlterTable_TableNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.alterTable(nonExistObjectPath, createTable(), true);
+
+		assertFalse(catalog.tableExists(nonExistObjectPath));
+	}
+
+	@Test
+	public void testRenameTable_nonPartitionedTable() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+		catalog.renameTable(path1, t2, false);
+
+		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3));
+		assertFalse(catalog.tableExists(path1));
+	}
+
+	@Test
+	public void testRenameTable_TableNotExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog");
+		catalog.renameTable(path1, t2, false);
+	}
+
+	@Test
+	public void testRenameTable_TableNotExistException_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.renameTable(path1, t2, true);
+	}
+
+	@Test
+	public void testRenameTable_TableAlreadyExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		CatalogTable table = createTable();
+		catalog.createTable(path1, table, false);
+		catalog.createTable(path3, createAnotherTable(), false);
+
+		exception.expect(TableAlreadyExistException.class);
+		exception.expectMessage("Table (or view) db1.t2 already exists in Catalog");
+		catalog.renameTable(path1, t2, false);
+	}
+
+	@Test
+	public void testTableExists() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertFalse(catalog.tableExists(path1));
+
+		catalog.createTable(path1, createTable(), false);
+
+		assertTrue(catalog.tableExists(path1));
+	}
+
 	// ------ utilities ------
 
 	/**
@@ -245,4 +477,63 @@ public abstract class CatalogTestBase {
 	 * @return another CatalogTable instance
 	 */
 	public abstract CatalogTable createAnotherTable();
+
+	/**
+	 * Create a streaming CatalogTable instance by specific catalog implementation.
+	 *
+	 * @return a streaming CatalogTable instance
+	 */
+	public abstract CatalogTable createStreamingTable();
+
+	/**
+	 * Create a partitioned CatalogTable instance by specific catalog implementation.
+	 *
+	 * @return a streaming CatalogTable instance
+	 */
+	public abstract CatalogTable createPartitionedTable();
+
+	/**
+	 * Create another partitioned CatalogTable instance by specific catalog implementation.
+	 *
+	 * @return another partitioned CatalogTable instance
+	 */
+	public abstract CatalogTable createAnotherPartitionedTable();
+
+	protected TableSchema createTableSchema() {
+		return new TableSchema(
+			new String[] {"first", "second", "third"},
+			new TypeInformation[] {
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+			}
+		);
+	}
+
+	protected TableSchema createAnotherTableSchema() {
+		return new TableSchema(
+			new String[] {"first2", "second", "third"},
+			new TypeInformation[] {
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO
+			}
+		);
+	}
+
+	protected List<String> createPartitionKeys() {
+		return Arrays.asList("second", "third");
+	}
+
+	protected Map<String, String> getBatchTableProperties() {
+		return new HashMap<String, String>() {{
+			put(IS_STREAMING, "false");
+		}};
+	}
+
+	protected Map<String, String> getStreamingTableProperties() {
+		return new HashMap<String, String>() {{
+			put(IS_STREAMING, "true");
+		}};
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 0700736..57341fe 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,26 +33,26 @@ public class CatalogTestUtil {
 		assertEquals(t1.getDescription(), t2.getDescription());
 	}
 
-	protected static void checkEquals(TableStats ts1, TableStats ts2) {
+	public static void checkEquals(TableStats ts1, TableStats ts2) {
 		assertEquals(ts1.getRowCount(), ts2.getRowCount());
 		assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
 	}
 
-	protected static void checkEquals(CatalogView v1, CatalogView v2) {
+	public static void checkEquals(CatalogView v1, CatalogView v2) {
 		assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
 		assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
 	}
 
-	protected static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
+	public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
 		assertEquals(d1.getProperties(), d2.getProperties());
 	}
 
-	protected static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
+	public static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
 		assertEquals(f1.getClassName(), f2.getClassName());
 		assertEquals(f1.getProperties(), f2.getProperties());
 	}
 
-	protected static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
+	public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
 		assertEquals(p1.getProperties(), p2.getProperties());
 	}
 }