You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/14 23:49:03 UTC
[flink] branch master updated: [FLINK-12234][hive] Support view
related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 544903d [FLINK-12234][hive] Support view related operations in HiveCatalog
544903d is described below
commit 544903d5cc77d6f90e0d77cb21ebf008b435e444
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon May 13 16:48:27 2019 -0700
[FLINK-12234][hive] Support view related operations in HiveCatalog
This PR supports view related operations in HiveCatalog and creates HiveCatalogView.
This closes #8434.
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 2 +-
.../flink/table/catalog/hive/HiveCatalog.java | 33 ++++++++--
.../flink/table/catalog/hive/HiveCatalogView.java | 72 ++++++++++++----------
.../flink/table/catalog/hive/HiveCatalogTest.java | 67 +++++++-------------
.../flink/table/catalog/GenericCatalogView.java | 14 ++---
.../flink/table/catalog/CatalogTestBase.java | 18 ++++--
.../flink/table/catalog/CatalogTestUtil.java | 8 ---
7 files changed, 109 insertions(+), 105 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 8558576..b5dfb9c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -198,7 +198,7 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
hiveTable.setViewExpandedText(view.getExpandedQuery());
hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
- throw new IllegalArgumentException(
+ throw new CatalogException(
"GenericHiveMetastoreCatalog only supports CatalogTable and CatalogView");
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 0619aa3..66826eb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -40,6 +39,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -98,8 +98,7 @@ public class HiveCatalog extends HiveCatalogBase {
@Override
protected void validateCatalogBaseTable(CatalogBaseTable table)
throws CatalogException {
- // TODO: validate HiveCatalogView
- if (!(table instanceof HiveCatalogTable)) {
+ if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView)) {
throw new CatalogException(
"HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
}
@@ -126,7 +125,18 @@ public class HiveCatalog extends HiveCatalogBase {
.collect(Collectors.toList());
}
- return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
+ if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
+ return new HiveCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
+ } else {
+ return new HiveCatalogTable(
+ tableSchema, partitionKeys, properties, comment);
+ }
}
@Override
@@ -153,7 +163,7 @@ public class HiveCatalog extends HiveCatalogBase {
List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
// Table columns and partition keys
- if (table instanceof CatalogTable) {
+ if (table instanceof HiveCatalogTable) {
HiveCatalogTable catalogTable = (HiveCatalogTable) table;
if (catalogTable.isPartitioned()) {
@@ -167,8 +177,19 @@ public class HiveCatalog extends HiveCatalogBase {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
+ } else if (table instanceof HiveCatalogView) {
+ HiveCatalogView view = (HiveCatalogView) table;
+
+ // TODO: [FLINK-12398] Support partitioned view in catalog API
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+
+ hiveTable.setViewOriginalText(view.getOriginalQuery());
+ hiveTable.setViewExpandedText(view.getExpandedQuery());
+ hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
- throw new UnsupportedOperationException("HiveCatalog doesn't support view yet");
+ throw new CatalogException(
+ "HiveCatalog only supports HiveCatalogTable and HiveCatalogView");
}
hiveTable.setSd(sd);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
similarity index 55%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
index c067650..968bfd8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
@@ -16,18 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * A generic catalog view implementation.
+ * A Hive catalog view implementation.
*/
-public class GenericCatalogView implements CatalogView {
+public class HiveCatalogView implements CatalogView {
// Original text of the view definition.
private final String originalQuery;
@@ -37,32 +43,37 @@ public class GenericCatalogView implements CatalogView {
// Expanded query text takes care of the this, as an example.
private final String expandedQuery;
- private final TableSchema schema;
+ // Schema of the view (column names and types)
+ private final TableSchema tableSchema;
+ // Properties of the view
private final Map<String, String> properties;
- private String comment = "This is a generic catalog view";
-
- public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
- Map<String, String> properties, String comment) {
- this(originalQuery, expandedQuery, schema, properties);
- this.comment = comment;
- }
+ // Comment of the view
+ private String comment = "This is a hive catalog view.";
+
+ public HiveCatalogView(
+ String originalQuery,
+ String expandedQuery,
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
- public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
- Map<String, String> properties) {
this.originalQuery = originalQuery;
this.expandedQuery = expandedQuery;
- this.schema = schema;
- this.properties = properties;
+ this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
+ this.properties = checkNotNull(properties, "properties cannot be null");
+ this.comment = comment;
}
@Override
public String getOriginalQuery() {
- return this.originalQuery;
+ return originalQuery;
}
@Override
public String getExpandedQuery() {
- return this.expandedQuery;
+ return expandedQuery;
}
@Override
@@ -72,31 +83,28 @@ public class GenericCatalogView implements CatalogView {
@Override
public TableSchema getSchema() {
- return schema;
+ return tableSchema;
}
@Override
- public GenericCatalogView copy() {
- return new GenericCatalogView(this.originalQuery, this.expandedQuery, schema.copy(),
- new HashMap<>(this.properties), comment);
+ public String getComment() {
+ return comment;
}
@Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
+ public CatalogBaseTable copy() {
+ return new HiveCatalogView(
+ originalQuery, expandedQuery, tableSchema.copy(), new HashMap<>(properties), comment);
}
@Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a catalog view in an im-memory catalog");
- }
-
- public String getComment() {
- return this.comment;
+ public Optional<String> getDescription() {
+ return Optional.ofNullable(comment);
}
- public void setComment(String comment) {
- this.comment = comment;
+ @Override
+ public Optional<String> getDetailedDescription() {
+ // TODO: return a detailed description
+ return Optional.ofNullable(comment);
}
-
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 2f3cd7a..beed72b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -49,46 +49,6 @@ public class HiveCatalogTest extends CatalogTestBase {
public void testCreateTable_Streaming() throws Exception {
}
- // =====================
- // HiveCatalog doesn't support view operation yet
- // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against HiveCatalog
- // =====================
-
- // TODO: re-enable these tests once HiveCatalog support view operations
-
- public void testListTables() throws Exception {
- }
-
- public void testCreateView() throws Exception {
- }
-
- public void testCreateView_DatabaseNotExistException() throws Exception {
- }
-
- public void testCreateView_TableAlreadyExistException() throws Exception {
- }
-
- public void testCreateView_TableAlreadyExist_ignored() throws Exception {
- }
-
- public void testDropView() throws Exception {
- }
-
- public void testAlterView() throws Exception {
- }
-
- public void testAlterView_TableNotExistException() throws Exception {
- }
-
- public void testAlterView_TableNotExist_ignored() throws Exception {
- }
-
- public void testListView() throws Exception {
- }
-
- public void testRenameView() throws Exception {
- }
-
// ------ utils ------
@Override
@@ -154,14 +114,22 @@ public class HiveCatalogTest extends CatalogTestBase {
@Override
public CatalogView createView() {
- // TODO: implement this once HiveCatalog support view operations
- return null;
+ return new HiveCatalogView(
+ String.format("select * from %s", t1),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
+ createTableSchema(),
+ new HashMap<>(),
+ "This is a hive view");
}
@Override
public CatalogView createAnotherView() {
- // TODO: implement this once HiveCatalog support view operations
- return null;
+ return new HiveCatalogView(
+ String.format("select * from %s", t2),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
+ createAnotherTableSchema(),
+ new HashMap<>(),
+ "This is another hive view");
}
@Override
@@ -175,4 +143,15 @@ public class HiveCatalogTest extends CatalogTestBase {
// thus properties of Hive table is a super set of those in its corresponding Flink table
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
}
+
+ protected void checkEquals(CatalogView v1, CatalogView v2) {
+ assertEquals(v1.getSchema(), v1.getSchema());
+ assertEquals(v1.getComment(), v2.getComment());
+ assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+ assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+
+ // Hive views may have properties created by itself
+ // thus properties of Hive view is a super set of those in its corresponding Flink view
+ assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
index c067650..4249644 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
@@ -76,6 +76,11 @@ public class GenericCatalogView implements CatalogView {
}
@Override
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
public GenericCatalogView copy() {
return new GenericCatalogView(this.originalQuery, this.expandedQuery, schema.copy(),
new HashMap<>(this.properties), comment);
@@ -90,13 +95,4 @@ public class GenericCatalogView implements CatalogView {
public Optional<String> getDetailedDescription() {
return Optional.of("This is a catalog view in an im-memory catalog");
}
-
- public String getComment() {
- return this.comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index e66525d..5cd91ce 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -451,7 +451,7 @@ public abstract class CatalogTestBase {
catalog.createTable(path1, view, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+ checkEquals(view, (CatalogView) catalog.getTable(path1));
}
@Test
@@ -481,12 +481,12 @@ public abstract class CatalogTestBase {
catalog.createTable(path1, view, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+ checkEquals(view, (CatalogView) catalog.getTable(path1));
catalog.createTable(path1, createAnotherView(), true);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+ checkEquals(view, (CatalogView) catalog.getTable(path1));
}
@Test
@@ -508,13 +508,13 @@ public abstract class CatalogTestBase {
CatalogView view = createView();
catalog.createTable(path1, view, false);
- CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
+ checkEquals(view, (CatalogView) catalog.getTable(path1));
CatalogView newView = createAnotherView();
catalog.alterTable(path1, newView, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- CatalogTestUtil.checkEquals(newView, (CatalogView) catalog.getTable(path1));
+ checkEquals(newView, (CatalogView) catalog.getTable(path1));
}
@Test
@@ -673,4 +673,12 @@ public abstract class CatalogTestBase {
assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
assertEquals(t1.isPartitioned(), t2.isPartitioned());
}
+
+ protected void checkEquals(CatalogView v1, CatalogView v2) {
+ assertEquals(v1.getSchema(), v1.getSchema());
+ assertEquals(v1.getProperties(), v2.getProperties());
+ assertEquals(v1.getComment(), v2.getComment());
+ assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+ assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+ }
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 412c73c..4a66e3f 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -45,14 +45,6 @@ public class CatalogTestUtil {
assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
}
- public static void checkEquals(CatalogView v1, CatalogView v2) {
- assertEquals(v1.getSchema(), v1.getSchema());
- assertEquals(v1.getProperties(), v2.getProperties());
- assertEquals(v1.getComment(), v2.getComment());
- assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
- assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
- }
-
public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
assertEquals(d1.getProperties(), d2.getProperties());
}