You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/26 06:30:46 UTC
[flink] 01/04: [FLINK-17896][hive] HiveCatalog can work with new
table factory because of is_generic
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit da1ccb9cfae8be0abda7e85d15db3d38304336d2
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 25 14:22:50 2020 +0800
[FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic
This closes #12316
---
.../connectors/hive/HiveDynamicTableFactory.java | 88 ++++++++++++++++++++++
.../flink/table/catalog/hive/HiveCatalog.java | 7 ++
.../table/catalog/hive/HiveCatalogITCase.java | 44 ++++++++++-
3 files changed, 135 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
new file mode 100644
index 0000000..56086fd
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC;
+
+/**
+ * A dynamic table factory implementation for Hive catalog. Now it only support generic tables.
+ * Hive tables should be resolved by {@link HiveTableFactory}.
+ */
+public class HiveDynamicTableFactory implements
+ DynamicTableSourceFactory,
+ DynamicTableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+ }
+
+ private static CatalogTable removeIsGenericFlag(CatalogTable table) {
+ Map<String, String> newOptions = new HashMap<>(table.getOptions());
+ boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
+ if (!isGeneric) {
+ throw new ValidationException(
+ "Hive dynamic table factory now only work for generic table.");
+ }
+ return table.copy(newOptions);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ return FactoryUtil.createTableSink(
+ null, // we already in the factory of catalog
+ context.getObjectIdentifier(),
+ removeIsGenericFlag(context.getCatalogTable()),
+ context.getConfiguration(),
+ context.getClassLoader());
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ return FactoryUtil.createTableSource(
+ null, // we already in the factory of catalog
+ context.getObjectIdentifier(),
+ removeIsGenericFlag(context.getCatalogTable()),
+ context.getConfiguration(),
+ context.getClassLoader());
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 5bc469f..d10127f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.connectors.hive.HiveTableFactory;
import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
@@ -69,6 +70,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.Preconditions;
@@ -238,6 +240,11 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new HiveDynamicTableFactory());
+ }
+
+ @Override
public Optional<TableFactory> getTableFactory() {
return Optional.of(new HiveTableFactory(hiveConf));
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 4064057..2817361 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
import org.apache.flink.table.catalog.ObjectPath;
@@ -52,8 +51,10 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
+import java.io.PrintStream;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -63,6 +64,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -237,7 +239,7 @@ public class HiveCatalogITCase {
// similar to CatalogTableITCase::testReadWriteCsvUsingDDL but uses HiveCatalog
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
- tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");
@@ -282,8 +284,7 @@ public class HiveCatalogITCase {
}
EnvironmentSettings settings = builder.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
- tableEnv.getConfig().getConfiguration().setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");
@@ -307,4 +308,39 @@ public class HiveCatalogITCase {
Assert.assertEquals(5, rows.size());
tableEnv.executeSql("DROP TABLE proctime_src");
}
+
+ @Test
+ public void testNewTableFactory() {
+ TableEnvironment tEnv = TableEnvironment.create(
+ EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.registerCatalog("myhive", hiveCatalog);
+ tEnv.useCatalog("myhive");
+ tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ String path = this.getClass().getResource("/csv/test.csv").getPath();
+
+ PrintStream originalSystemOut = System.out;
+ try {
+ ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(arrayOutputStream));
+
+ tEnv.executeSql("create table csv_table (name String, age Int) with (" +
+ "'connector.type' = 'filesystem'," +
+ "'connector.path' = 'file://" + path + "'," +
+ "'format.type' = 'csv')");
+ tEnv.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
+
+ TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into print_table select * from csv_table");
+
+ // assert query result
+ assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", arrayOutputStream.toString());
+ } finally {
+ if (System.out != originalSystemOut) {
+ System.out.close();
+ }
+ System.setOut(originalSystemOut);
+ tEnv.executeSql("DROP TABLE csv_table");
+ tEnv.executeSql("DROP TABLE print_table");
+ }
+ }
}