You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 22:04:59 UTC

[flink] branch gview created (now 111a6c8)

This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch gview
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at 111a6c8  [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

This branch includes the following new commits:

     new 111a6c8  [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/01: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch gview
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 111a6c8966e92e3166c8c025bb5e7a06a59c16af
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon May 6 15:04:32 2019 -0700

    [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |  14 +-
 .../hive/GenericHiveMetastoreCatalogUtil.java      |  36 ++++-
 .../hive/GenericHiveMetastoreCatalogTest.java      |  22 +++
 .../flink/table/catalog/hive/HiveCatalogTest.java  |  66 +++++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 180 +++------------------
 .../flink/table/catalog/CatalogTestBase.java       | 153 +++++++++++++++++-
 6 files changed, 281 insertions(+), 190 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index bb431cc..e178acd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -188,7 +189,18 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 
 	@Override
 	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		try {
+
+			return client.getTables(
+				databaseName,
+				null, // table pattern
+				TableType.VIRTUAL_VIEW);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list tables in database %s", databaseName), e);
+		}
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
index 0e564f5..4d07c20 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -23,10 +23,13 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.plan.stats.TableStats;
 
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -75,7 +78,6 @@ public class GenericHiveMetastoreCatalogUtil {
 
 	/**
 	 * Creates a Hive table from a CatalogBaseTable.
-	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
 	 *
 	 * @param tablePath path of the table
 	 * @param table the CatalogBaseTable instance
@@ -118,18 +120,26 @@ public class GenericHiveMetastoreCatalogUtil {
 				hiveTable.setPartitionKeys(new ArrayList<>());
 			}
 
-			hiveTable.setSd(sd);
+			hiveTable.setTableType(TableType.EXTERNAL_TABLE.name());
 		} else {
-			// TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
-			throw new UnsupportedOperationException();
+			CatalogView view = (CatalogView) table;
+
+			// TODO: [FLINK-12398] Support partitioned view in catalog API
+			sd.setCols(allColumns);
+			hiveTable.setPartitionKeys(new ArrayList<>());
+
+			hiveTable.setViewOriginalText(view.getOriginalQuery());
+			hiveTable.setViewExpandedText(view.getExpandedQuery());
+			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
 		}
 
+		hiveTable.setSd(sd);
+
 		return hiveTable;
 	}
 
 	/**
 	 * Creates a CatalogBaseTable from a Hive table.
-	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
 	 *
 	 * @param hiveTable the Hive table
 	 * @return a CatalogBaseTable
@@ -148,14 +158,24 @@ public class GenericHiveMetastoreCatalogUtil {
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
 
-		if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) {
+		if (hiveTable.getPartitionKeys().isEmpty()) {
 			partitionKeys = hiveTable.getPartitionKeys().stream()
 								.map(fs -> fs.getName())
 								.collect(Collectors.toList());
 		}
 
-		return new GenericCatalogTable(
-			tableSchema, new TableStats(0), partitionKeys, properties, comment);
+		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
+			return new GenericCatalogView(
+				hiveTable.getViewOriginalText(),
+				hiveTable.getViewExpandedText(),
+				tableSchema,
+				properties,
+				comment
+			);
+		} else {
+			return new GenericCatalogTable(
+				tableSchema, new TableStats(0), partitionKeys, properties, comment);
+		}
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 76f8e08..3cebd55 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.BeforeClass;
@@ -162,4 +164,24 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
+
+	@Override
+	public CatalogView createView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t1),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is a view");
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t2),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is another view");
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 7b132c9..dfd7fcf 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogView;
 
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -44,82 +44,96 @@ public class HiveCatalogTest extends CatalogTestBase {
 	// =====================
 
 	// TODO: re-enable these tests once HiveCatalog support table operations
-	@Test
 	public void testDropDb_DatabaseNotEmptyException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_Streaming() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_Batch() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_DatabaseNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_TableAlreadyExistException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testGetTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testGetTable_TableNotExistException_NoDb() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_nonPartitionedTable() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_TableNotExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable_TableNotExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_nonPartitionedTable() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableNotExistException_ignored() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableAlreadyExistException() throws Exception {
 	}
 
-	@Test
+	public void testListTables() throws Exception {
+	}
+
 	public void testTableExists() throws Exception {
 	}
 
+	public void testCreateView() throws Exception {
+	}
+
+	public void testCreateView_DatabaseNotExistException() throws Exception {
+	}
+
+	public void testCreateView_TableAlreadyExistException() throws Exception {
+	}
+
+	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
+	}
+
+	public void testDropView() throws Exception {
+	}
+
+	public void testAlterView() throws Exception {
+	}
+
+	public void testAlterView_TableNotExistException() throws Exception {
+	}
+
+	public void testAlterView_TableNotExist_ignored() throws Exception {
+	}
+
+	public void testListView() throws Exception {
+	}
+
+	public void testRenameView() throws Exception {
+	}
+
 	// ------ utils ------
 
 	@Override
@@ -176,4 +190,16 @@ public class HiveCatalogTest extends CatalogTestBase {
 		// TODO: implement this once HiveCatalog support table operations
 		return null;
 	}
+
+	@Override
+	public CatalogView createView() {
+		// TODO: implement this once HiveCatalog support view operations
+		return null;
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		// TODO: implement this once HiveCatalog support view operations
+		return null;
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 40a0d9f..25a2635 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.functions.ScalarFunction;
@@ -36,7 +35,6 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -82,23 +80,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testListTables() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		catalog.createTable(path1, createTable(), false);
-		catalog.createTable(path3, createTable(), false);
-		catalog.createTable(path4, createView(), false);
-
-		assertEquals(3, catalog.listTables(db1).size());
-		assertEquals(1, catalog.listViews(db1).size());
-
-		catalog.dropTable(path1, false);
-		catalog.dropTable(path3, false);
-		catalog.dropTable(path4, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
 	public void testRenameTable_partitionedTable() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
@@ -118,129 +99,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
 	}
 
-	// ------ views ------
-
-	@Test
-	public void testCreateView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertFalse(catalog.tableExists(path1));
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testCreateView_DatabaseNotExistException() throws Exception {
-		assertFalse(catalog.databaseExists(db1));
-
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database db1 does not exist in Catalog");
-		catalog.createTable(nonExistObjectPath, createView(), false);
-	}
-
-	@Test
-	public void testCreateView_TableAlreadyExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createView(), false);
-
-		exception.expect(TableAlreadyExistException.class);
-		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
-		catalog.createTable(path1, createView(), false);
-	}
-
-	@Test
-	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-
-		catalog.createTable(path1, createAnotherView(), true);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testDropView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createView(), false);
-
-		assertTrue(catalog.tableExists(path1));
-
-		catalog.dropTable(path1, false);
-
-		assertFalse(catalog.tableExists(path1));
-	}
-
-	@Test
-	public void testAlterView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-
-		CatalogView newView = createAnotherView();
-		catalog.alterTable(path1, newView, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(newView, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testAlterView_TableNotExistException() throws Exception {
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
-		catalog.alterTable(nonExistDbPath, createTable(), false);
-	}
-
-	@Test
-	public void testAlterView_TableNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.alterTable(nonExistObjectPath, createView(), true);
-
-		assertFalse(catalog.tableExists(nonExistObjectPath));
-	}
-
-	@Test
-	public void testListView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertTrue(catalog.listTables(db1).isEmpty());
-
-		catalog.createTable(path1, createView(), false);
-		catalog.createTable(path3, createTable(), false);
-
-		assertEquals(2, catalog.listTables(db1).size());
-		assertEquals(new HashSet<>(Arrays.asList(path1.getObjectName(), path3.getObjectName())),
-			new HashSet<>(catalog.listTables(db1)));
-		assertEquals(Arrays.asList(path1.getObjectName()), catalog.listViews(db1));
-	}
-
-	@Test
-	public void testRenameView() throws Exception {
-		catalog.createDatabase("db1", new GenericCatalogDatabase(new HashMap<>()), false);
-		GenericCatalogView view = new GenericCatalogView("select * from t1",
-			"select * from db1.t1", createTableSchema(), new HashMap<>());
-		ObjectPath viewPath1 = new ObjectPath(db1, "view1");
-		catalog.createTable(viewPath1, view, false);
-		assertTrue(catalog.tableExists(viewPath1));
-		catalog.renameTable(viewPath1, "view2", false);
-		assertFalse(catalog.tableExists(viewPath1));
-		ObjectPath viewPath2 = new ObjectPath(db1, "view2");
-		assertTrue(catalog.tableExists(viewPath2));
-		catalog.dropTable(viewPath2, false);
-	}
-
 	// ------ partitions ------
 
 	@Test
@@ -789,6 +647,26 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			TEST_COMMENT);
 	}
 
+	@Override
+	public CatalogView createView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t1),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is a view");
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t2),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is another view");
+	}
+
 	private CatalogPartitionSpec createPartitionSpec() {
 		return new CatalogPartitionSpec(
 			new HashMap<String, String>() {{
@@ -831,24 +709,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		return new GenericCatalogPartition(props);
 	}
 
-	private CatalogView createView() {
-		return new GenericCatalogView(
-			String.format("select * from %s", t1),
-			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
-			createTableSchema(),
-			new HashMap<>(),
-			"This is a view");
-	}
-
-	private CatalogView createAnotherView() {
-		return new GenericCatalogView(
-			String.format("select * from %s", t2),
-			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
-			createTableSchema(),
-			new HashMap<>(),
-			"This is another view");
-	}
-
 	protected CatalogFunction createFunction() {
 		return new GenericCatalogFunction(MyScalarFunction.class.getName());
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 470a9a9..81c9cba 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -56,10 +56,11 @@ public abstract class CatalogTestBase {
 
 	protected final String t1 = "t1";
 	protected final String t2 = "t2";
+	protected final String t3 = "t3";
 	protected final ObjectPath path1 = new ObjectPath(db1, t1);
 	protected final ObjectPath path2 = new ObjectPath(db2, t2);
 	protected final ObjectPath path3 = new ObjectPath(db1, t2);
-	protected final ObjectPath path4 = new ObjectPath(db1, "t3");
+	protected final ObjectPath path4 = new ObjectPath(db1, t3);
 	protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
 	protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
 
@@ -83,6 +84,9 @@ public abstract class CatalogTestBase {
 		if (catalog.tableExists(path3)) {
 			catalog.dropTable(path3, true);
 		}
+		if (catalog.tableExists(path4)) {
+			catalog.dropTable(path4, true);
+		}
 
 		if (catalog.databaseExists(db1)) {
 			catalog.dropDatabase(db1, true);
@@ -436,6 +440,18 @@ public abstract class CatalogTestBase {
 	}
 
 	@Test
+	public void testListTables() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		catalog.createTable(path1, createTable(), false);
+		catalog.createTable(path3, createTable(), false);
+		catalog.createTable(path4, createView(), false);
+
+		assertEquals(3, catalog.listTables(db1).size());
+		assertEquals(1, catalog.listViews(db1).size());
+	}
+
+	@Test
 	public void testTableExists() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 
@@ -446,6 +462,127 @@ public abstract class CatalogTestBase {
 		assertTrue(catalog.tableExists(path1));
 	}
 
+	// ------ views ------
+
+	@Test
+	public void testCreateView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertFalse(catalog.tableExists(path1));
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testCreateView_DatabaseNotExistException() throws Exception {
+		assertFalse(catalog.databaseExists(db1));
+
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database db1 does not exist in Catalog");
+		catalog.createTable(nonExistObjectPath, createView(), false);
+	}
+
+	@Test
+	public void testCreateView_TableAlreadyExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		exception.expect(TableAlreadyExistException.class);
+		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+		catalog.createTable(path1, createView(), false);
+	}
+
+	@Test
+	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+
+		catalog.createTable(path1, createAnotherView(), true);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testDropView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		assertTrue(catalog.tableExists(path1));
+
+		catalog.dropTable(path1, false);
+
+		assertFalse(catalog.tableExists(path1));
+	}
+
+	@Test
+	public void testAlterView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+
+		CatalogView newView = createAnotherView();
+		catalog.alterTable(path1, newView, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(newView, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testAlterView_TableNotExistException() throws Exception {
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+		catalog.alterTable(nonExistDbPath, createTable(), false);
+	}
+
+	@Test
+	public void testAlterView_TableNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.alterTable(nonExistObjectPath, createView(), true);
+
+		assertFalse(catalog.tableExists(nonExistObjectPath));
+	}
+
+	@Test
+	public void testListView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertTrue(catalog.listTables(db1).isEmpty());
+
+		catalog.createTable(path1, createView(), false);
+		catalog.createTable(path3, createTable(), false);
+
+		assertEquals(2, catalog.listTables(db1).size());
+		assertEquals(new HashSet<>(Arrays.asList(path1.getObjectName(), path3.getObjectName())),
+			new HashSet<>(catalog.listTables(db1)));
+		assertEquals(Arrays.asList(path1.getObjectName()), catalog.listViews(db1));
+	}
+
+	@Test
+	public void testRenameView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		assertTrue(catalog.tableExists(path1));
+
+		catalog.renameTable(path1, t2, false);
+
+		assertFalse(catalog.tableExists(path1));
+		assertTrue(catalog.tableExists(path3));
+	}
+
 	// ------ utilities ------
 
 	/**
@@ -504,6 +641,20 @@ public abstract class CatalogTestBase {
 	 */
 	public abstract CatalogTable createAnotherPartitionedTable();
 
+	/**
+	 * Create a CatalogView instance by specific catalog implementation.
+	 *
+	 * @return a CatalogView instance
+	 */
+	public abstract CatalogView createView();
+
+	/**
+	 * Create another CatalogView instance by specific catalog implementation.
+	 *
+	 * @return another CatalogView instance
+	 */
+	public abstract CatalogView createAnotherView();
+
 	protected TableSchema createTableSchema() {
 		return new TableSchema(
 			new String[] {"first", "second", "third"},