You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/26 06:30:46 UTC

[flink] 01/04: [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da1ccb9cfae8be0abda7e85d15db3d38304336d2
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 25 14:22:50 2020 +0800

    [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic
    
    This closes #12316
---
 .../connectors/hive/HiveDynamicTableFactory.java   | 88 ++++++++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalog.java      |  7 ++
 .../table/catalog/hive/HiveCatalogITCase.java      | 44 ++++++++++-
 3 files changed, 135 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
new file mode 100644
index 0000000..56086fd
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC;
+
+/**
+ * A dynamic table factory implementation for Hive catalog. Now it only support generic tables.
+ * Hive tables should be resolved by {@link HiveTableFactory}.
+ */
+public class HiveDynamicTableFactory implements
+		DynamicTableSourceFactory,
+		DynamicTableSinkFactory {
+
+	@Override
+	public String factoryIdentifier() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	private static CatalogTable removeIsGenericFlag(CatalogTable table) {
+		Map<String, String> newOptions = new HashMap<>(table.getOptions());
+		boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
+		if (!isGeneric) {
+			throw new ValidationException(
+					"Hive dynamic table factory now only work for generic table.");
+		}
+		return table.copy(newOptions);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		return FactoryUtil.createTableSink(
+				null, // we already in the factory of catalog
+				context.getObjectIdentifier(),
+				removeIsGenericFlag(context.getCatalogTable()),
+				context.getConfiguration(),
+				context.getClassLoader());
+	}
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		return FactoryUtil.createTableSource(
+				null, // we already in the factory of catalog
+				context.getObjectIdentifier(),
+				removeIsGenericFlag(context.getCatalogTable()),
+				context.getConfiguration(),
+				context.getClassLoader());
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 5bc469f..d10127f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
@@ -69,6 +70,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.Preconditions;
@@ -238,6 +240,11 @@ public class HiveCatalog extends AbstractCatalog {
 	}
 
 	@Override
+	public Optional<Factory> getFactory() {
+		return Optional.of(new HiveDynamicTableFactory());
+	}
+
+	@Override
 	public Optional<TableFactory> getTableFactory() {
 		return Optional.of(new HiveTableFactory(hiveConf));
 	}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 4064057..2817361 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -52,8 +51,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileReader;
+import java.io.PrintStream;
 import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -63,6 +64,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -237,7 +239,7 @@ public class HiveCatalogITCase {
 		// similar to CatalogTableITCase::testReadWriteCsvUsingDDL but uses HiveCatalog
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 		TableEnvironment tableEnv = TableEnvironment.create(settings);
-		tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
 		tableEnv.registerCatalog("myhive", hiveCatalog);
 		tableEnv.useCatalog("myhive");
@@ -282,8 +284,7 @@ public class HiveCatalogITCase {
 		}
 		EnvironmentSettings settings = builder.build();
 		TableEnvironment tableEnv = TableEnvironment.create(settings);
-		tableEnv.getConfig().getConfiguration().setInteger(
-				ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
 		tableEnv.registerCatalog("myhive", hiveCatalog);
 		tableEnv.useCatalog("myhive");
@@ -307,4 +308,39 @@ public class HiveCatalogITCase {
 		Assert.assertEquals(5, rows.size());
 		tableEnv.executeSql("DROP TABLE proctime_src");
 	}
+
+	@Test
+	public void testNewTableFactory() {
+		TableEnvironment tEnv = TableEnvironment.create(
+				EnvironmentSettings.newInstance().inBatchMode().build());
+		tEnv.registerCatalog("myhive", hiveCatalog);
+		tEnv.useCatalog("myhive");
+		tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+		String path = this.getClass().getResource("/csv/test.csv").getPath();
+
+		PrintStream originalSystemOut = System.out;
+		try {
+			ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+			System.setOut(new PrintStream(arrayOutputStream));
+
+			tEnv.executeSql("create table csv_table (name String, age Int) with (" +
+					"'connector.type' = 'filesystem'," +
+					"'connector.path' = 'file://" + path + "'," +
+					"'format.type' = 'csv')");
+			tEnv.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
+
+			TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into print_table select * from csv_table");
+
+			// assert query result
+			assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", arrayOutputStream.toString());
+		} finally {
+			if (System.out != originalSystemOut) {
+				System.out.close();
+			}
+			System.setOut(originalSystemOut);
+			tEnv.executeSql("DROP TABLE csv_table");
+			tEnv.executeSql("DROP TABLE print_table");
+		}
+	}
 }