You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/25 06:02:38 UTC
flink git commit: [FLINK-9934] [table] Fix invalid field mapping by
Kafka table source factory
Repository: flink
Updated Branches:
refs/heads/master 3f2232668 -> 378cbb7c2
[FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory
According to the DefinedFieldMapping interface the field mapping can also contain
the input fields. However, the Kafka table source factory was calling
SchemaValidator#deriveFieldMapping with its own schema instead of the input type.
This closes #6403.
This closes #3124.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/378cbb7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/378cbb7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/378cbb7c
Branch: refs/heads/master
Commit: 378cbb7c2e580ba73f215234e7dff542c3e2bc97
Parents: 3f22326
Author: Timo Walther <tw...@apache.org>
Authored: Tue Jul 24 11:40:36 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Jul 25 08:01:07 2018 +0200
----------------------------------------------------------------------
.../kafka/KafkaTableSourceSinkFactoryBase.java | 14 ++++++++-----
.../KafkaJsonTableSourceFactoryTestBase.java | 5 +++++
.../KafkaTableSourceSinkFactoryTestBase.java | 5 +++++
.../table/descriptors/SchemaValidator.scala | 21 +++++++++++++-------
.../table/descriptors/SchemaValidatorTest.scala | 4 ++--
5 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 3307994..27b2e67 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -132,18 +132,20 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
- final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+ final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
return createKafkaTableSource(
- schema,
+ descriptorProperties.getTableSchema(SCHEMA()),
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
- SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+ SchemaValidator.deriveFieldMapping(
+ descriptorProperties,
+ Optional.of(deserializationSchema.getProducedType())),
topic,
getKafkaProperties(descriptorProperties),
- getDeserializationSchema(properties),
+ deserializationSchema,
startupOptions.startupMode,
startupOptions.specificOffsets);
}
@@ -318,7 +320,9 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
}
private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
- final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+ final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(
+ descriptorProperties,
+ Optional.of(schema.toRowType())); // until FLINK-9870 is fixed we assume that the table schema is the output type
return fieldMapping.size() != schema.getColumnNames().length ||
!fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 20da156..51c0e7b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
@@ -101,7 +102,9 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
final Map<String, String> tableJsonMapping = new HashMap<>();
tableJsonMapping.put("fruit-name", "name");
+ tableJsonMapping.put("name", "name");
tableJsonMapping.put("count", "count");
+ tableJsonMapping.put("time", "time");
final Properties props = new Properties();
props.put("group.id", "test-group");
@@ -129,6 +132,8 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
.withRowtimeAttribute("event-time", new ExistingField("time"), new AscendingTimestamps())
.build();
+ TableSourceUtil.validateTableSource(builderSource);
+
// construct table source using descriptors and table source factory
final Map<Integer, Long> offsets = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index d8e8f7d..504bed1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.factories.utils.TestTableFormat;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
@@ -115,7 +116,9 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
final Map<String, String> fieldMapping = new HashMap<>();
fieldMapping.put(FRUIT_NAME, NAME);
+ fieldMapping.put(NAME, NAME);
fieldMapping.put(COUNT, COUNT);
+ fieldMapping.put(TIME, TIME);
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
@@ -141,6 +144,8 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
StartupMode.SPECIFIC_OFFSETS,
specificOffsets);
+ TableSourceUtil.validateTableSource(expected);
+
// construct table source using descriptors and table source factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index af2baba..f6cbb2b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.descriptors
import java.util
import java.util.Optional
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala}
import org.apache.flink.table.descriptors.RowtimeValidator._
@@ -222,23 +224,28 @@ object SchemaValidator {
/**
* Finds a table source field mapping.
+ *
+ * @param properties The properties describing a schema.
+ * @param inputType The input type that a connector and/or format produces. This parameter
+ * can be used to resolve a rowtime field against an input field.
*/
def deriveFieldMapping(
properties: DescriptorProperties,
- sourceSchema: Optional[TableSchema])
+ inputType: Optional[TypeInformation[_]])
: util.Map[String, String] = {
val mapping = mutable.Map[String, String]()
val schema = properties.getTableSchema(SCHEMA)
- // add all source fields first because rowtime might reference one of them
- toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
- names.foreach { name =>
- mapping.put(name, name)
- }
+ val columnNames = toScala(inputType) match {
+ case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
+ case _ => Seq[String]()
}
+ // add all source fields first because rowtime might reference one of them
+ columnNames.foreach(name => mapping.put(name, name))
+
// add all schema fields first for implicit mappings
schema.getColumnNames.foreach { name =>
mapping.put(name, name)
@@ -266,7 +273,7 @@ object SchemaValidator {
mapping.remove(name)
}
// check for invalid fields
- else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
+ else if (!columnNames.contains(name)) {
throw new ValidationException(s"Could not map the schema field '$name' to a field " +
s"from source. Please specify the source field from which it can be derived.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/378cbb7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index c558057..a2eec4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -67,7 +67,7 @@ class SchemaValidatorTest {
"myField" -> "myField").asJava
assertEquals(
expectedMapping,
- SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+ SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType)))
// test field format
val formatSchema = SchemaValidator.deriveFormatFields(props)
@@ -148,7 +148,7 @@ class SchemaValidatorTest {
"myTime" -> "myTime").asJava
assertEquals(
expectedMapping,
- SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+ SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType)))
// test field format
val formatSchema = SchemaValidator.deriveFormatFields(props)