You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/25 06:02:38 UTC

flink git commit: [FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory

Repository: flink
Updated Branches:
  refs/heads/master 3f2232668 -> 378cbb7c2


[FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory

According to the DefinedFieldMapping interface the field mapping can also contain
the input fields. However, the Kafka table source factory was calling
SchemaValidator#deriveFieldMapping with its own schema instead of the input type.

This closes #6403.
This closes #3124.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/378cbb7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/378cbb7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/378cbb7c

Branch: refs/heads/master
Commit: 378cbb7c2e580ba73f215234e7dff542c3e2bc97
Parents: 3f22326
Author: Timo Walther <tw...@apache.org>
Authored: Tue Jul 24 11:40:36 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Jul 25 08:01:07 2018 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTableSourceSinkFactoryBase.java  | 14 ++++++++-----
 .../KafkaJsonTableSourceFactoryTestBase.java    |  5 +++++
 .../KafkaTableSourceSinkFactoryTestBase.java    |  5 +++++
 .../table/descriptors/SchemaValidator.scala     | 21 +++++++++++++-------
 .../table/descriptors/SchemaValidatorTest.scala |  4 ++--
 5 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 3307994..27b2e67 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -132,18 +132,20 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
 		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
 
-		final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
 		final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+		final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
 		final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
 
 		return createKafkaTableSource(
-			schema,
+			descriptorProperties.getTableSchema(SCHEMA()),
 			SchemaValidator.deriveProctimeAttribute(descriptorProperties),
 			SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
-			SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+			SchemaValidator.deriveFieldMapping(
+				descriptorProperties,
+				Optional.of(deserializationSchema.getProducedType())),
 			topic,
 			getKafkaProperties(descriptorProperties),
-			getDeserializationSchema(properties),
+			deserializationSchema,
 			startupOptions.startupMode,
 			startupOptions.specificOffsets);
 	}
@@ -318,7 +320,9 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 	}
 
 	private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
-		final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+		final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(
+			descriptorProperties,
+			Optional.of(schema.toRowType())); // until FLINK-9870 is fixed we assume that the table schema is the output type
 		return fieldMapping.size() != schema.getColumnNames().length ||
 			!fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue()));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 20da156..51c0e7b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.descriptors.TestTableDescriptor;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
 import org.apache.flink.table.sources.tsextractors.ExistingField;
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 
@@ -101,7 +102,9 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 
 		final Map<String, String> tableJsonMapping = new HashMap<>();
 		tableJsonMapping.put("fruit-name", "name");
+		tableJsonMapping.put("name", "name");
 		tableJsonMapping.put("count", "count");
+		tableJsonMapping.put("time", "time");
 
 		final Properties props = new Properties();
 		props.put("group.id", "test-group");
@@ -129,6 +132,8 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 				.withRowtimeAttribute("event-time", new ExistingField("time"), new AscendingTimestamps())
 				.build();
 
+		TableSourceUtil.validateTableSource(builderSource);
+
 		// construct table source using descriptors and table source factory
 
 		final Map<Integer, Long> offsets = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index d8e8f7d..504bed1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.factories.utils.TestTableFormat;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
 import org.apache.flink.table.sources.tsextractors.ExistingField;
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 import org.apache.flink.types.Row;
@@ -115,7 +116,9 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 
 		final Map<String, String> fieldMapping = new HashMap<>();
 		fieldMapping.put(FRUIT_NAME, NAME);
+		fieldMapping.put(NAME, NAME);
 		fieldMapping.put(COUNT, COUNT);
+		fieldMapping.put(TIME, TIME);
 
 		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
 		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
@@ -141,6 +144,8 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 			StartupMode.SPECIFIC_OFFSETS,
 			specificOffsets);
 
+		TableSourceUtil.validateTableSource(expected);
+
 		// construct table source using descriptors and table source factory
 
 		final TestTableDescriptor testDesc = new TestTableDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index af2baba..f6cbb2b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.descriptors
 import java.util
 import java.util.Optional
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
 import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala}
 import org.apache.flink.table.descriptors.RowtimeValidator._
@@ -222,23 +224,28 @@ object SchemaValidator {
 
   /**
     * Finds a table source field mapping.
+    *
+    * @param properties The properties describing a schema.
+    * @param inputType  The input type that a connector and/or format produces. This parameter
+    *                   can be used to resolve a rowtime field against an input field.
     */
   def deriveFieldMapping(
       properties: DescriptorProperties,
-      sourceSchema: Optional[TableSchema])
+      inputType: Optional[TypeInformation[_]])
     : util.Map[String, String] = {
 
     val mapping = mutable.Map[String, String]()
 
     val schema = properties.getTableSchema(SCHEMA)
 
-    // add all source fields first because rowtime might reference one of them
-    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
-      names.foreach { name =>
-        mapping.put(name, name)
-      }
+    val columnNames = toScala(inputType) match {
+      case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
+      case _ => Seq[String]()
     }
 
+    // add all source fields first because rowtime might reference one of them
+    columnNames.foreach(name => mapping.put(name, name))
+
     // add all schema fields first for implicit mappings
     schema.getColumnNames.foreach { name =>
       mapping.put(name, name)
@@ -266,7 +273,7 @@ object SchemaValidator {
             mapping.remove(name)
           }
           // check for invalid fields
-          else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
+          else if (!columnNames.contains(name)) {
             throw new ValidationException(s"Could not map the schema field '$name' to a field " +
               s"from source. Please specify the source field from which it can be derived.")
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index c558057..a2eec4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -67,7 +67,7 @@ class SchemaValidatorTest {
       "myField" -> "myField").asJava
     assertEquals(
       expectedMapping,
-      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType)))
 
     // test field format
     val formatSchema = SchemaValidator.deriveFormatFields(props)
@@ -148,7 +148,7 @@ class SchemaValidatorTest {
       "myTime" -> "myTime").asJava
     assertEquals(
       expectedMapping,
-      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType)))
 
     // test field format
     val formatSchema = SchemaValidator.deriveFormatFields(props)