You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 22:05:00 UTC

[flink] 01/01: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch gview
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 111a6c8966e92e3166c8c025bb5e7a06a59c16af
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon May 6 15:04:32 2019 -0700

    [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |  14 +-
 .../hive/GenericHiveMetastoreCatalogUtil.java      |  36 ++++-
 .../hive/GenericHiveMetastoreCatalogTest.java      |  22 +++
 .../flink/table/catalog/hive/HiveCatalogTest.java  |  66 +++++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 180 +++------------------
 .../flink/table/catalog/CatalogTestBase.java       | 153 +++++++++++++++++-
 6 files changed, 281 insertions(+), 190 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index bb431cc..e178acd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -188,7 +189,18 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 
 	@Override
 	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
+		try {
+
+			return client.getTables(
+				databaseName,
+				null, // table pattern
+				TableType.VIRTUAL_VIEW);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list tables in database %s", databaseName), e);
+		}
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
index 0e564f5..4d07c20 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -23,10 +23,13 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.plan.stats.TableStats;
 
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -75,7 +78,6 @@ public class GenericHiveMetastoreCatalogUtil {
 
 	/**
 	 * Creates a Hive table from a CatalogBaseTable.
-	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
 	 *
 	 * @param tablePath path of the table
 	 * @param table the CatalogBaseTable instance
@@ -118,18 +120,26 @@ public class GenericHiveMetastoreCatalogUtil {
 				hiveTable.setPartitionKeys(new ArrayList<>());
 			}
 
-			hiveTable.setSd(sd);
+			hiveTable.setTableType(TableType.EXTERNAL_TABLE.name());
 		} else {
-			// TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
-			throw new UnsupportedOperationException();
+			CatalogView view = (CatalogView) table;
+
+			// TODO: [FLINK-12398] Support partitioned view in catalog API
+			sd.setCols(allColumns);
+			hiveTable.setPartitionKeys(new ArrayList<>());
+
+			hiveTable.setViewOriginalText(view.getOriginalQuery());
+			hiveTable.setViewExpandedText(view.getExpandedQuery());
+			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
 		}
 
+		hiveTable.setSd(sd);
+
 		return hiveTable;
 	}
 
 	/**
 	 * Creates a CatalogBaseTable from a Hive table.
-	 * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
 	 *
 	 * @param hiveTable the Hive table
 	 * @return a CatalogBaseTable
@@ -148,14 +158,24 @@ public class GenericHiveMetastoreCatalogUtil {
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
 
-		if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) {
+		if (hiveTable.getPartitionKeys().isEmpty()) {
 			partitionKeys = hiveTable.getPartitionKeys().stream()
 								.map(fs -> fs.getName())
 								.collect(Collectors.toList());
 		}
 
-		return new GenericCatalogTable(
-			tableSchema, new TableStats(0), partitionKeys, properties, comment);
+		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
+			return new GenericCatalogView(
+				hiveTable.getViewOriginalText(),
+				hiveTable.getViewExpandedText(),
+				tableSchema,
+				properties,
+				comment
+			);
+		} else {
+			return new GenericCatalogTable(
+				tableSchema, new TableStats(0), partitionKeys, properties, comment);
+		}
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 76f8e08..3cebd55 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.BeforeClass;
@@ -162,4 +164,24 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 			getBatchTableProperties(),
 			TEST_COMMENT);
 	}
+
+	@Override
+	public CatalogView createView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t1),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is a view");
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t2),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is another view");
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 7b132c9..dfd7fcf 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogView;
 
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -44,82 +44,96 @@ public class HiveCatalogTest extends CatalogTestBase {
 	// =====================
 
 	// TODO: re-enable these tests once HiveCatalog support table operations
-	@Test
 	public void testDropDb_DatabaseNotEmptyException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_Streaming() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_Batch() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_DatabaseNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_TableAlreadyExistException() throws Exception {
 	}
 
-	@Test
 	public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testGetTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testGetTable_TableNotExistException_NoDb() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_nonPartitionedTable() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testDropTable_TableNotExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testAlterTable_TableNotExist_ignored() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_nonPartitionedTable() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableNotExistException() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableNotExistException_ignored() throws Exception {
 	}
 
-	@Test
 	public void testRenameTable_TableAlreadyExistException() throws Exception {
 	}
 
-	@Test
+	public void testListTables() throws Exception {
+	}
+
 	public void testTableExists() throws Exception {
 	}
 
+	public void testCreateView() throws Exception {
+	}
+
+	public void testCreateView_DatabaseNotExistException() throws Exception {
+	}
+
+	public void testCreateView_TableAlreadyExistException() throws Exception {
+	}
+
+	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
+	}
+
+	public void testDropView() throws Exception {
+	}
+
+	public void testAlterView() throws Exception {
+	}
+
+	public void testAlterView_TableNotExistException() throws Exception {
+	}
+
+	public void testAlterView_TableNotExist_ignored() throws Exception {
+	}
+
+	public void testListView() throws Exception {
+	}
+
+	public void testRenameView() throws Exception {
+	}
+
 	// ------ utils ------
 
 	@Override
@@ -176,4 +190,16 @@ public class HiveCatalogTest extends CatalogTestBase {
 		// TODO: implement this once HiveCatalog support table operations
 		return null;
 	}
+
+	@Override
+	public CatalogView createView() {
+		// TODO: implement this once HiveCatalog support view operations
+		return null;
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		// TODO: implement this once HiveCatalog support view operations
+		return null;
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 40a0d9f..25a2635 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.functions.ScalarFunction;
@@ -36,7 +35,6 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -82,23 +80,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	@Test
-	public void testListTables() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		catalog.createTable(path1, createTable(), false);
-		catalog.createTable(path3, createTable(), false);
-		catalog.createTable(path4, createView(), false);
-
-		assertEquals(3, catalog.listTables(db1).size());
-		assertEquals(1, catalog.listViews(db1).size());
-
-		catalog.dropTable(path1, false);
-		catalog.dropTable(path3, false);
-		catalog.dropTable(path4, false);
-		catalog.dropDatabase(db1, false);
-	}
-
-	@Test
 	public void testRenameTable_partitionedTable() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createPartitionedTable();
@@ -118,129 +99,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
 	}
 
-	// ------ views ------
-
-	@Test
-	public void testCreateView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertFalse(catalog.tableExists(path1));
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testCreateView_DatabaseNotExistException() throws Exception {
-		assertFalse(catalog.databaseExists(db1));
-
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database db1 does not exist in Catalog");
-		catalog.createTable(nonExistObjectPath, createView(), false);
-	}
-
-	@Test
-	public void testCreateView_TableAlreadyExistException() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createView(), false);
-
-		exception.expect(TableAlreadyExistException.class);
-		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
-		catalog.createTable(path1, createView(), false);
-	}
-
-	@Test
-	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-
-		catalog.createTable(path1, createAnotherView(), true);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testDropView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.createTable(path1, createView(), false);
-
-		assertTrue(catalog.tableExists(path1));
-
-		catalog.dropTable(path1, false);
-
-		assertFalse(catalog.tableExists(path1));
-	}
-
-	@Test
-	public void testAlterView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		CatalogView view = createView();
-		catalog.createTable(path1, view, false);
-
-		CatalogTestUtil.checkEquals(view, (GenericCatalogView) catalog.getTable(path1));
-
-		CatalogView newView = createAnotherView();
-		catalog.alterTable(path1, newView, false);
-
-		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(newView, (GenericCatalogView) catalog.getTable(path1));
-	}
-
-	@Test
-	public void testAlterView_TableNotExistException() throws Exception {
-		exception.expect(TableNotExistException.class);
-		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
-		catalog.alterTable(nonExistDbPath, createTable(), false);
-	}
-
-	@Test
-	public void testAlterView_TableNotExist_ignored() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-		catalog.alterTable(nonExistObjectPath, createView(), true);
-
-		assertFalse(catalog.tableExists(nonExistObjectPath));
-	}
-
-	@Test
-	public void testListView() throws Exception {
-		catalog.createDatabase(db1, createDb(), false);
-
-		assertTrue(catalog.listTables(db1).isEmpty());
-
-		catalog.createTable(path1, createView(), false);
-		catalog.createTable(path3, createTable(), false);
-
-		assertEquals(2, catalog.listTables(db1).size());
-		assertEquals(new HashSet<>(Arrays.asList(path1.getObjectName(), path3.getObjectName())),
-			new HashSet<>(catalog.listTables(db1)));
-		assertEquals(Arrays.asList(path1.getObjectName()), catalog.listViews(db1));
-	}
-
-	@Test
-	public void testRenameView() throws Exception {
-		catalog.createDatabase("db1", new GenericCatalogDatabase(new HashMap<>()), false);
-		GenericCatalogView view = new GenericCatalogView("select * from t1",
-			"select * from db1.t1", createTableSchema(), new HashMap<>());
-		ObjectPath viewPath1 = new ObjectPath(db1, "view1");
-		catalog.createTable(viewPath1, view, false);
-		assertTrue(catalog.tableExists(viewPath1));
-		catalog.renameTable(viewPath1, "view2", false);
-		assertFalse(catalog.tableExists(viewPath1));
-		ObjectPath viewPath2 = new ObjectPath(db1, "view2");
-		assertTrue(catalog.tableExists(viewPath2));
-		catalog.dropTable(viewPath2, false);
-	}
-
 	// ------ partitions ------
 
 	@Test
@@ -789,6 +647,26 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 			TEST_COMMENT);
 	}
 
+	@Override
+	public CatalogView createView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t1),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is a view");
+	}
+
+	@Override
+	public CatalogView createAnotherView() {
+		return new GenericCatalogView(
+			String.format("select * from %s", t2),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is another view");
+	}
+
 	private CatalogPartitionSpec createPartitionSpec() {
 		return new CatalogPartitionSpec(
 			new HashMap<String, String>() {{
@@ -831,24 +709,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		return new GenericCatalogPartition(props);
 	}
 
-	private CatalogView createView() {
-		return new GenericCatalogView(
-			String.format("select * from %s", t1),
-			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
-			createTableSchema(),
-			new HashMap<>(),
-			"This is a view");
-	}
-
-	private CatalogView createAnotherView() {
-		return new GenericCatalogView(
-			String.format("select * from %s", t2),
-			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
-			createTableSchema(),
-			new HashMap<>(),
-			"This is another view");
-	}
-
 	protected CatalogFunction createFunction() {
 		return new GenericCatalogFunction(MyScalarFunction.class.getName());
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 470a9a9..81c9cba 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -56,10 +56,11 @@ public abstract class CatalogTestBase {
 
 	protected final String t1 = "t1";
 	protected final String t2 = "t2";
+	protected final String t3 = "t3";
 	protected final ObjectPath path1 = new ObjectPath(db1, t1);
 	protected final ObjectPath path2 = new ObjectPath(db2, t2);
 	protected final ObjectPath path3 = new ObjectPath(db1, t2);
-	protected final ObjectPath path4 = new ObjectPath(db1, "t3");
+	protected final ObjectPath path4 = new ObjectPath(db1, t3);
 	protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
 	protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
 
@@ -83,6 +84,9 @@ public abstract class CatalogTestBase {
 		if (catalog.tableExists(path3)) {
 			catalog.dropTable(path3, true);
 		}
+		if (catalog.tableExists(path4)) {
+			catalog.dropTable(path4, true);
+		}
 
 		if (catalog.databaseExists(db1)) {
 			catalog.dropDatabase(db1, true);
@@ -436,6 +440,18 @@ public abstract class CatalogTestBase {
 	}
 
 	@Test
+	public void testListTables() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		catalog.createTable(path1, createTable(), false);
+		catalog.createTable(path3, createTable(), false);
+		catalog.createTable(path4, createView(), false);
+
+		assertEquals(3, catalog.listTables(db1).size());
+		assertEquals(1, catalog.listViews(db1).size());
+	}
+
+	@Test
 	public void testTableExists() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 
@@ -446,6 +462,127 @@ public abstract class CatalogTestBase {
 		assertTrue(catalog.tableExists(path1));
 	}
 
+	// ------ views ------
+
+	@Test
+	public void testCreateView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertFalse(catalog.tableExists(path1));
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testCreateView_DatabaseNotExistException() throws Exception {
+		assertFalse(catalog.databaseExists(db1));
+
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database db1 does not exist in Catalog");
+		catalog.createTable(nonExistObjectPath, createView(), false);
+	}
+
+	@Test
+	public void testCreateView_TableAlreadyExistException() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		exception.expect(TableAlreadyExistException.class);
+		exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+		catalog.createTable(path1, createView(), false);
+	}
+
+	@Test
+	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+
+		catalog.createTable(path1, createAnotherView(), true);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testDropView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		assertTrue(catalog.tableExists(path1));
+
+		catalog.dropTable(path1, false);
+
+		assertFalse(catalog.tableExists(path1));
+	}
+
+	@Test
+	public void testAlterView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		CatalogView view = createView();
+		catalog.createTable(path1, view, false);
+
+		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+
+		CatalogView newView = createAnotherView();
+		catalog.alterTable(path1, newView, false);
+
+		assertTrue(catalog.getTable(path1) instanceof CatalogView);
+		CatalogTestUtil.checkEquals(newView, (CatalogView) catalog.getTable(path1));
+	}
+
+	@Test
+	public void testAlterView_TableNotExistException() throws Exception {
+		exception.expect(TableNotExistException.class);
+		exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+		catalog.alterTable(nonExistDbPath, createTable(), false);
+	}
+
+	@Test
+	public void testAlterView_TableNotExist_ignored() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.alterTable(nonExistObjectPath, createView(), true);
+
+		assertFalse(catalog.tableExists(nonExistObjectPath));
+	}
+
+	@Test
+	public void testListView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+
+		assertTrue(catalog.listTables(db1).isEmpty());
+
+		catalog.createTable(path1, createView(), false);
+		catalog.createTable(path3, createTable(), false);
+
+		assertEquals(2, catalog.listTables(db1).size());
+		assertEquals(new HashSet<>(Arrays.asList(path1.getObjectName(), path3.getObjectName())),
+			new HashSet<>(catalog.listTables(db1)));
+		assertEquals(Arrays.asList(path1.getObjectName()), catalog.listViews(db1));
+	}
+
+	@Test
+	public void testRenameView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		assertTrue(catalog.tableExists(path1));
+
+		catalog.renameTable(path1, t2, false);
+
+		assertFalse(catalog.tableExists(path1));
+		assertTrue(catalog.tableExists(path3));
+	}
+
 	// ------ utilities ------
 
 	/**
@@ -504,6 +641,20 @@ public abstract class CatalogTestBase {
 	 */
 	public abstract CatalogTable createAnotherPartitionedTable();
 
+	/**
+	 * Create a CatalogView instance by specific catalog implementation.
+	 *
+	 * @return a CatalogView instance
+	 */
+	public abstract CatalogView createView();
+
+	/**
+	 * Create another CatalogView instance by specific catalog implementation.
+	 *
+	 * @return another CatalogView instance
+	 */
+	public abstract CatalogView createAnotherView();
+
 	protected TableSchema createTableSchema() {
 		return new TableSchema(
 			new String[] {"first", "second", "third"},