You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/02/25 12:29:43 UTC

[flink] branch master updated: [FLINK-16264][table] Fix ConnectTableDescriptor loose time attribute bug (#11204)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd288c9  [FLINK-16264][table] Fix ConnectTableDescriptor loose time attribute bug (#11204)
dd288c9 is described below

commit dd288c958861105b0ffc0966a8d32c1188342649
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Feb 25 20:29:22 2020 +0800

    [FLINK-16264][table] Fix ConnectTableDescriptor loose time attribute bug (#11204)
    
    In CatalogTableImpl.fromProperties, we can not remove all schema.* for TableSchema, because Schema (it is a descriptor) is not same to TableSchema. Schema contains time attribute, so we need keep them in properties.
---
 .../client/gateway/local/ExecutionContextTest.java | 10 ++++
 .../gateway/utils/TestTableSourceFactoryBase.java  |  2 +-
 .../flink/table/catalog/CatalogTableImpl.java      | 21 ++++++--
 .../table/descriptors/ConnectTableDescriptor.java  |  7 ---
 .../descriptors/ConnectTableDescriptorTest.java    | 56 ++++++++++++++++++++++
 .../table/descriptors/DescriptorProperties.java    | 19 --------
 6 files changed, 83 insertions(+), 32 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 167fdf9..bb5b4bd 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -37,6 +38,9 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.commons.cli.Options;
@@ -214,6 +218,12 @@ public class ExecutionContextTest {
 		assertArrayEquals(
 			new String[]{"integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0"},
 			tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
+
+		// Please delete this test after removing registerTableSource in SQL-CLI.
+		TableSchema tableSchema = tableEnv.from("EnrichmentSource").getSchema();
+		LogicalType timestampType = tableSchema.getFieldDataTypes()[2].getLogicalType();
+		assertTrue(timestampType instanceof TimestampType);
+		assertEquals(TimestampKind.ROWTIME, ((TimestampType) timestampType).getKind());
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index f305b90..980cb74 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -100,7 +100,7 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
 	public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
 		TableSchema schema = context.getTable().getSchema();
 		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(context.getTable().getProperties());
+		params.putProperties(context.getTable().toProperties());
 		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
 		final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
 		return new TestTableSource(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 9808b32..6b61981 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -86,14 +86,25 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 	 * Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
 	 */
 	public static CatalogTableImpl fromProperties(Map<String, String> properties) {
-		DescriptorProperties descriptorProperties = new DescriptorProperties();
-		descriptorProperties.putProperties(properties);
-		TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA);
-		descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
+		TableSchema tableSchema = extractTableSchema(properties);
 		return new CatalogTableImpl(
 				tableSchema,
-				descriptorProperties.asMap(),
+				removeTableSchema(properties, tableSchema),
 				""
 		);
 	}
+
+	private static TableSchema extractTableSchema(Map<String, String> properties) {
+		DescriptorProperties descriptorProperties = new DescriptorProperties();
+		descriptorProperties.putProperties(properties);
+		return descriptorProperties.getTableSchema(Schema.SCHEMA);
+	}
+
+	private static Map<String, String> removeTableSchema(Map<String, String> properties, TableSchema schema) {
+		Map<String, String> ret = new HashMap<>(properties);
+		DescriptorProperties descriptorProperties = new DescriptorProperties();
+		descriptorProperties.putTableSchema(Schema.SCHEMA, schema);
+		descriptorProperties.asMap().keySet().forEach(ret::remove);
+		return ret;
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index a7c54c7..f9798f0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.descriptors;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.Registration;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.util.Preconditions;
@@ -82,12 +81,6 @@ public abstract class ConnectTableDescriptor
 		registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties()));
 	}
 
-	private TableSchema getTableSchema(Map<String, String> schemaProperties) {
-		DescriptorProperties properties = new DescriptorProperties();
-		properties.putProperties(schemaProperties);
-		return properties.getTableSchema(Schema.SCHEMA);
-	}
-
 	@Override
 	protected Map<String, String> additionalProperties() {
 		if (schemaDescriptor != null) {
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptors/ConnectTableDescriptorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptors/ConnectTableDescriptorTest.java
new file mode 100644
index 0000000..11b8fbd
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptors/ConnectTableDescriptorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.Registration;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test for {@link ConnectTableDescriptor}.
+ */
+public class ConnectTableDescriptorTest {
+
+	@Test
+	public void testProperties() {
+		AtomicReference<CatalogTableImpl> reference = new AtomicReference<>();
+		Registration registration = (path, table) -> reference.set((CatalogTableImpl) table);
+		ConnectTableDescriptor descriptor = new StreamTableDescriptor(
+				registration, new FileSystem().path("myPath"))
+				.withFormat(new FormatDescriptor("myFormat", 1) {
+					@Override
+					protected Map<String, String> toFormatProperties() {
+						return new HashMap<>();
+					}
+				})
+				.withSchema(new Schema()
+						.field("f0", DataTypes.INT())
+						.rowtime(new Rowtime().timestampsFromField("f0")));
+		descriptor.createTemporaryTable("myTable");
+
+		Assert.assertEquals(descriptor.toProperties(), reference.get().toProperties());
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 1f17519..f299db4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -44,7 +44,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -145,24 +144,6 @@ public class DescriptorProperties {
 	}
 
 	/**
-	 * Removes the mapping for a key prefix from this properties if it is present.
-	 *
-	 * <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and
-	 * value "v" will be removed.
-	 */
-	public void removeKeyPrefix(String prefix) {
-		checkNotNull(prefix);
-
-		Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
-		while (iterator.hasNext()) {
-			String key = iterator.next().getKey();
-			if (key.startsWith(prefix)) {
-				iterator.remove();
-			}
-		}
-	}
-
-	/**
 	 * Adds a class under the given key.
 	 */
 	public void putClass(String key, Class<?> clazz) {