You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/30 09:32:19 UTC

[flink] 05/06: [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1db13f5cfb3b81409d6f6fb079424f44ccc826e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 24 11:21:00 2019 +0200

    [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client
    
    This closes #9229.
---
 .../client/gateway/local/ExecutionContext.java     |  28 ++---
 .../table/client/gateway/local/LocalExecutor.java  |   7 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  23 +++-
 .../client/gateway/utils/SimpleCatalogFactory.java | 118 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../test/resources/test-sql-client-defaults.yaml   |   3 +
 .../flink/table/api/EnvironmentSettings.java       |   7 +-
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0df7fba..ae7fb2c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -302,20 +302,6 @@ public class ExecutionContext<T> {
 			// register catalogs
 			catalogs.forEach(tableEnv::registerCatalog);
 
-			// set current catalog
-			if (sessionContext.getCurrentCatalog().isPresent()) {
-				tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
-			} else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
-				tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
-			}
-
-			// set current database
-			if (sessionContext.getCurrentDatabase().isPresent()) {
-				tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
-			} else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
-				tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
-			}
-
 			// create query config
 			queryConfig = createQueryConfig();
 
@@ -340,6 +326,20 @@ public class ExecutionContext<T> {
 					registerTemporalTable(temporalTableEntry);
 				}
 			});
+
+			// set current catalog
+			if (sessionContext.getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+			} else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
+			}
+
+			// set current database
+			if (sessionContext.getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+			} else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
+			}
 		}
 
 		public QueryConfig getQueryConfig() {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 7e41f1f..101f72c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
@@ -473,7 +474,11 @@ public class LocalExecutor implements Executor {
 			// writing to a sink requires an optimization step that might reference UDFs during code compilation
 			context.wrapClassLoader(() -> {
 				envInst.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
-				table.insertInto(jobName, envInst.getQueryConfig());
+				table.insertInto(
+					envInst.getQueryConfig(),
+					EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+					EnvironmentSettings.DEFAULT_BUILTIN_DATABASE,
+					jobName);
 				return null;
 			});
 			jobGraph = envInst.createJobGraph(jobName);
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ac1a7ae..5dbec41 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+import org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -63,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -160,7 +162,8 @@ public class LocalExecutorITCase extends TestLogger {
 
 		final List<String> expectedCatalogs = Arrays.asList(
 			"default_catalog",
-			"catalog1");
+			"catalog1",
+			"simple-catalog");
 		assertEquals(expectedCatalogs, actualCatalogs);
 	}
 
@@ -402,7 +405,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final SessionContext session = new SessionContext("test-session", new Environment());
 
 		try {
-			// start job
+			// Case 1: Registered sink
 			final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
 				session,
 				"INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1");
@@ -424,6 +427,22 @@ public class LocalExecutorITCase extends TestLogger {
 						fail("Unexpected job status.");
 				}
 			}
+
+			// Case 2: Temporary sink
+			session.setCurrentCatalog("simple-catalog");
+			session.setCurrentDatabase("default_database");
+			// all queries are pipelined to an in-memory sink, check it is properly registered
+			final ResultDescriptor otherCatalogDesc = executor.executeQuery(session, "SELECT * FROM `test-table`");
+
+			final List<String> otherCatalogResults = retrieveTableResult(
+				executor,
+				session,
+				otherCatalogDesc.getResultId());
+
+			TestBaseUtils.compareResultCollections(
+				SimpleCatalogFactory.TABLE_CONTENTS.stream().map(Row::toString).collect(Collectors.toList()),
+				otherCatalogResults,
+				Comparator.naturalOrder());
 		} finally {
 			executor.stop(session);
 		}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
new file mode 100644
index 0000000..5f533fb
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ConnectorCatalogTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog factory for an in-memory catalog that contains a single non-empty table.
+ * The contents of the table are equal to {@link SimpleCatalogFactory#TABLE_CONTENTS}.
+ */
+public class SimpleCatalogFactory implements CatalogFactory {
+
+	public static final String CATALOG_TYPE_VALUE = "simple-catalog";
+
+	public static final String TEST_TABLE_NAME = "test-table";
+
+	public static final List<Row> TABLE_CONTENTS = Arrays.asList(
+		Row.of(1, "Hello"),
+		Row.of(2, "Hello world"),
+		Row.of(3, "Hello world! Hello!")
+	);
+
+	@Override
+	public Catalog createCatalog(String name, Map<String, String> properties) {
+		String database = properties.getOrDefault(
+			CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
+			"default_database");
+		GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);
+
+		String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
+		StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
+			@Override
+			public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+				return execEnv.fromCollection(TABLE_CONTENTS)
+					.returns(new RowTypeInfo(
+						new TypeInformation[]{Types.INT(), Types.STRING()},
+						new String[]{"id", "string"}));
+			}
+
+			@Override
+			public TableSchema getTableSchema() {
+				return TableSchema.builder()
+					.field("id", DataTypes.INT())
+					.field("string", DataTypes.STRING())
+					.build();
+			}
+
+			@Override
+			public DataType getProducedDataType() {
+				return DataTypes.ROW(
+					DataTypes.FIELD("id", DataTypes.INT()),
+					DataTypes.FIELD("string", DataTypes.STRING())
+				);
+			}
+		};
+
+		try {
+			genericInMemoryCatalog.createTable(
+				new ObjectPath(database, tableName),
+				ConnectorCatalogTable.source(tableSource, false),
+				false
+			);
+		} catch (Exception e) {
+			throw new WrappingRuntimeException(e);
+		}
+
+		return genericInMemoryCatalog;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CatalogDescriptorValidator.CATALOG_TYPE, CATALOG_TYPE_VALUE);
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, TEST_TABLE_NAME);
+	}
+}
diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b4e3095..5ba6b0b 100644
--- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -15,5 +15,6 @@
 
 org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory
 org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
+org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9e0582b..9844d54 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -137,3 +137,6 @@ deployment:
 catalogs:
   - name: catalog1
     type: DependencyTest
+  - name: simple-catalog
+    type: simple-catalog
+    test-table: test-table
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index bfd9203..7eec4a3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -46,8 +46,11 @@ import java.util.Map;
  */
 @PublicEvolving
 public class EnvironmentSettings {
+
 	public static final String STREAMING_MODE = "streaming-mode";
 	public static final String CLASS_NAME = "class-name";
+	public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
+	public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
 
 	/**
 	 * Canonical name of the {@link Planner} class to use.
@@ -158,8 +161,8 @@ public class EnvironmentSettings {
 
 		private String plannerClass = OLD_PLANNER_FACTORY;
 		private String executorClass = OLD_EXECUTOR_FACTORY;
-		private String builtInCatalogName = "default_catalog";
-		private String builtInDatabaseName = "default_database";
+		private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
+		private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
 		private boolean isStreamingMode = true;
 
 		/**