You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/14 23:49:03 UTC

[flink] branch master updated: [FLINK-12234][hive] Support view related operations in HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 544903d  [FLINK-12234][hive] Support view related operations in HiveCatalog
544903d is described below

commit 544903d5cc77d6f90e0d77cb21ebf008b435e444
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon May 13 16:48:27 2019 -0700

    [FLINK-12234][hive] Support view related operations in HiveCatalog
    
    This PR supports view related operations in HiveCatalog and creates HiveCatalogView.
    
    This closes #8434.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |  2 +-
 .../flink/table/catalog/hive/HiveCatalog.java      | 33 ++++++++--
 .../flink/table/catalog/hive/HiveCatalogView.java  | 72 ++++++++++++----------
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 67 +++++++-------------
 .../flink/table/catalog/GenericCatalogView.java    | 14 ++---
 .../flink/table/catalog/CatalogTestBase.java       | 18 ++++--
 .../flink/table/catalog/CatalogTestUtil.java       |  8 ---
 7 files changed, 109 insertions(+), 105 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 8558576..b5dfb9c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -198,7 +198,7 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 			hiveTable.setViewExpandedText(view.getExpandedQuery());
 			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
 		} else {
-			throw new IllegalArgumentException(
+			throw new CatalogException(
 				"GenericHiveMetastoreCatalog only supports CatalogTable and CatalogView");
 		}
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 0619aa3..66826eb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -40,6 +39,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -98,8 +98,7 @@ public class HiveCatalog extends HiveCatalogBase {
 	@Override
 	protected void validateCatalogBaseTable(CatalogBaseTable table)
 			throws CatalogException {
-		// TODO: validate HiveCatalogView
-		if (!(table instanceof HiveCatalogTable)) {
+		if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView)) {
 			throw new CatalogException(
 				"HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
 		}
@@ -126,7 +125,18 @@ public class HiveCatalog extends HiveCatalogBase {
 				.collect(Collectors.toList());
 		}
 
-		return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
+		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
+			return new HiveCatalogView(
+				hiveTable.getViewOriginalText(),
+				hiveTable.getViewExpandedText(),
+				tableSchema,
+				properties,
+				comment
+			);
+		} else {
+			return new HiveCatalogTable(
+				tableSchema, partitionKeys, properties, comment);
+		}
 	}
 
 	@Override
@@ -153,7 +163,7 @@ public class HiveCatalog extends HiveCatalogBase {
 		List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
 
 		// Table columns and partition keys
-		if (table instanceof CatalogTable) {
+		if (table instanceof HiveCatalogTable) {
 			HiveCatalogTable catalogTable = (HiveCatalogTable) table;
 
 			if (catalogTable.isPartitioned()) {
@@ -167,8 +177,19 @@ public class HiveCatalog extends HiveCatalogBase {
 				sd.setCols(allColumns);
 				hiveTable.setPartitionKeys(new ArrayList<>());
 			}
+		} else if (table instanceof HiveCatalogView) {
+			HiveCatalogView view = (HiveCatalogView) table;
+
+			// TODO: [FLINK-12398] Support partitioned view in catalog API
+			sd.setCols(allColumns);
+			hiveTable.setPartitionKeys(new ArrayList<>());
+
+			hiveTable.setViewOriginalText(view.getOriginalQuery());
+			hiveTable.setViewExpandedText(view.getExpandedQuery());
+			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
 		} else {
-			throw new UnsupportedOperationException("HiveCatalog doesn't support view yet");
+			throw new CatalogException(
+				"HiveCatalog only supports HiveCatalogTable and HiveCatalogView");
 		}
 
 		hiveTable.setSd(sd);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
similarity index 55%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
index c067650..968bfd8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
@@ -16,18 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A generic catalog view implementation.
+ * A Hive catalog view implementation.
  */
-public class GenericCatalogView implements CatalogView {
+public class HiveCatalogView implements CatalogView {
 	// Original text of the view definition.
 	private final String originalQuery;
 
@@ -37,32 +43,37 @@ public class GenericCatalogView implements CatalogView {
 	// Expanded query text takes care of the this, as an example.
 	private final String expandedQuery;
 
-	private final TableSchema schema;
+	// Schema of the view (column names and types)
+	private final TableSchema tableSchema;
+	// Properties of the view
 	private final Map<String, String> properties;
-	private String comment = "This is a generic catalog view";
-
-	public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
-		Map<String, String> properties, String comment) {
-		this(originalQuery, expandedQuery, schema, properties);
-		this.comment = comment;
-	}
+	// Comment of the view
+	private String comment = "This is a hive catalog view.";
+
+	public HiveCatalogView(
+			String originalQuery,
+			String expandedQuery,
+			TableSchema tableSchema,
+			Map<String, String> properties,
+			String comment) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
 
-	public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
-		Map<String, String> properties) {
 		this.originalQuery = originalQuery;
 		this.expandedQuery = expandedQuery;
-		this.schema = schema;
-		this.properties = properties;
+		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
+		this.properties = checkNotNull(properties, "properties cannot be null");
+		this.comment = comment;
 	}
 
 	@Override
 	public String getOriginalQuery() {
-		return this.originalQuery;
+		return originalQuery;
 	}
 
 	@Override
 	public String getExpandedQuery() {
-		return this.expandedQuery;
+		return expandedQuery;
 	}
 
 	@Override
@@ -72,31 +83,28 @@ public class GenericCatalogView implements CatalogView {
 
 	@Override
 	public TableSchema getSchema() {
-		return schema;
+		return tableSchema;
 	}
 
 	@Override
-	public GenericCatalogView copy() {
-		return new GenericCatalogView(this.originalQuery, this.expandedQuery, schema.copy(),
-			new HashMap<>(this.properties), comment);
+	public String getComment() {
+		return comment;
 	}
 
 	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
+	public CatalogBaseTable copy() {
+		return new HiveCatalogView(
+			originalQuery, expandedQuery, tableSchema.copy(), new HashMap<>(properties), comment);
 	}
 
 	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a catalog view in an im-memory catalog");
-	}
-
-	public String getComment() {
-		return this.comment;
+	public Optional<String> getDescription() {
+		return Optional.ofNullable(comment);
 	}
 
-	public void setComment(String comment) {
-		this.comment = comment;
+	@Override
+	public Optional<String> getDetailedDescription() {
+		// TODO: return a detailed description
+		return Optional.ofNullable(comment);
 	}
-
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 2f3cd7a..beed72b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -49,46 +49,6 @@ public class HiveCatalogTest extends CatalogTestBase {
 	public void testCreateTable_Streaming() throws Exception {
 	}
 
-	// =====================
-	// HiveCatalog doesn't support view operation yet
-	// Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against HiveCatalog
-	// =====================
-
-	// TODO: re-enable these tests once HiveCatalog support view operations
-
-	public void testListTables() throws Exception {
-	}
-
-	public void testCreateView() throws Exception {
-	}
-
-	public void testCreateView_DatabaseNotExistException() throws Exception {
-	}
-
-	public void testCreateView_TableAlreadyExistException() throws Exception {
-	}
-
-	public void testCreateView_TableAlreadyExist_ignored() throws Exception {
-	}
-
-	public void testDropView() throws Exception {
-	}
-
-	public void testAlterView() throws Exception {
-	}
-
-	public void testAlterView_TableNotExistException() throws Exception {
-	}
-
-	public void testAlterView_TableNotExist_ignored() throws Exception {
-	}
-
-	public void testListView() throws Exception {
-	}
-
-	public void testRenameView() throws Exception {
-	}
-
 	// ------ utils ------
 
 	@Override
@@ -154,14 +114,22 @@ public class HiveCatalogTest extends CatalogTestBase {
 
 	@Override
 	public CatalogView createView() {
-		// TODO: implement this once HiveCatalog support view operations
-		return null;
+		return new HiveCatalogView(
+			String.format("select * from %s", t1),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+			createTableSchema(),
+			new HashMap<>(),
+			"This is a hive view");
 	}
 
 	@Override
 	public CatalogView createAnotherView() {
-		// TODO: implement this once HiveCatalog support view operations
-		return null;
+		return new HiveCatalogView(
+			String.format("select * from %s", t2),
+			String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+			createAnotherTableSchema(),
+			new HashMap<>(),
+			"This is another hive view");
 	}
 
 	@Override
@@ -175,4 +143,15 @@ public class HiveCatalogTest extends CatalogTestBase {
 		// thus properties of Hive table is a super set of those in its corresponding Flink table
 		assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
 	}
+
+	protected void checkEquals(CatalogView v1, CatalogView v2) {
+		assertEquals(v1.getSchema(), v1.getSchema());
+		assertEquals(v1.getComment(), v2.getComment());
+		assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+		assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+
+		// Hive views may have properties created by itself
+		// thus properties of Hive view is a super set of those in its corresponding Flink view
+		assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
index c067650..4249644 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
@@ -76,6 +76,11 @@ public class GenericCatalogView implements CatalogView {
 	}
 
 	@Override
+	public String getComment() {
+		return comment;
+	}
+
+	@Override
 	public GenericCatalogView copy() {
 		return new GenericCatalogView(this.originalQuery, this.expandedQuery, schema.copy(),
 			new HashMap<>(this.properties), comment);
@@ -90,13 +95,4 @@ public class GenericCatalogView implements CatalogView {
 	public Optional<String> getDetailedDescription() {
 		return Optional.of("This is a catalog view in an im-memory catalog");
 	}
-
-	public String getComment() {
-		return this.comment;
-	}
-
-	public void setComment(String comment) {
-		this.comment = comment;
-	}
-
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index e66525d..5cd91ce 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -451,7 +451,7 @@ public abstract class CatalogTestBase {
 		catalog.createTable(path1, view, false);
 
 		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+		checkEquals(view, (CatalogView) catalog.getTable(path1));
 	}
 
 	@Test
@@ -481,12 +481,12 @@ public abstract class CatalogTestBase {
 		catalog.createTable(path1, view, false);
 
 		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+		checkEquals(view, (CatalogView) catalog.getTable(path1));
 
 		catalog.createTable(path1, createAnotherView(), true);
 
 		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+		checkEquals(view, (CatalogView) catalog.getTable(path1));
 	}
 
 	@Test
@@ -508,13 +508,13 @@ public abstract class CatalogTestBase {
 		CatalogView view = createView();
 		catalog.createTable(path1, view, false);
 
-		CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+		checkEquals(view, (CatalogView) catalog.getTable(path1));
 
 		CatalogView newView = createAnotherView();
 		catalog.alterTable(path1, newView, false);
 
 		assertTrue(catalog.getTable(path1) instanceof CatalogView);
-		CatalogTestUtil.checkEquals(newView, (CatalogView) catalog.getTable(path1));
+		checkEquals(newView, (CatalogView) catalog.getTable(path1));
 	}
 
 	@Test
@@ -673,4 +673,12 @@ public abstract class CatalogTestBase {
 		assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
 		assertEquals(t1.isPartitioned(), t2.isPartitioned());
 	}
+
+	protected void checkEquals(CatalogView v1, CatalogView v2) {
+		assertEquals(v1.getSchema(), v1.getSchema());
+		assertEquals(v1.getProperties(), v2.getProperties());
+		assertEquals(v1.getComment(), v2.getComment());
+		assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+		assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 412c73c..4a66e3f 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -45,14 +45,6 @@ public class CatalogTestUtil {
 		assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
 	}
 
-	public static void checkEquals(CatalogView v1, CatalogView v2) {
-		assertEquals(v1.getSchema(), v1.getSchema());
-		assertEquals(v1.getProperties(), v2.getProperties());
-		assertEquals(v1.getComment(), v2.getComment());
-		assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
-		assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
-	}
-
 	public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
 		assertEquals(d1.getProperties(), d2.getProperties());
 	}