You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/16 13:26:11 UTC
[2/5] flink git commit: [FLINK-8014] [table] Add
Kafka010JsonTableSink.
[FLINK-8014] [table] Add Kafka010JsonTableSink.
- Refactor KafkaTableSink tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa
Branch: refs/heads/master
Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd
Parents: fc3eebd
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:59:43 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 16 11:32:12 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/Kafka010JsonTableSink.java | 73 ++++++++++++++++++++
.../kafka/Kafka010JsonTableSinkTest.java | 53 ++++++++++++++
.../connectors/kafka/Kafka08JsonTableSink.java | 26 ++++++-
.../kafka/Kafka08JsonTableSinkTest.java | 27 +++-----
.../connectors/kafka/Kafka09JsonTableSink.java | 26 ++++++-
.../kafka/Kafka09JsonTableSinkTest.java | 27 +++-----
.../connectors/kafka/KafkaJsonTableSink.java | 5 +-
.../connectors/kafka/KafkaTableSink.java | 10 ++-
.../JsonRowSerializationSchema.java | 22 +++++-
.../kafka/JsonRowSerializationSchemaTest.java | 46 ++++++++----
.../kafka/KafkaTableSinkTestBase.java | 30 ++++----
11 files changed, 269 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 0000000..431ace0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+ * topic with fixed partition assignment.
+ *
+ * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+ * <ul>
+ * <li>If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.</li>
+ * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.</li>
+ * </ul>
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka010JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+ * topic with custom partition assignment.
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
+ }
+
+ @Override
+ protected Kafka010JsonTableSink createCopy() {
+ return new Kafka010JsonTableSink(topic, properties, partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
new file mode 100644
index 0000000..4d805d5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSink}.
+ */
+public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner) {
+
+ return new Kafka010JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
+ }
+
+ @Override
+ protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer010.class;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index a887048..39d5cb2 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8.
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with fixed partition assignment.
+ *
+ * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+ * <ul>
+ * <li>If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.</li>
+ * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.</li>
+ * </ul>
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka08JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
}
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8.
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 890fc3a..d7bb683 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
protected KafkaTableSink createTableSink(
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka08JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic,
- Properties properties,
- SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
-
- return kafkaProducer;
- }
- };
+ FlinkKafkaPartitioner<Row> partitioner) {
+
+ return new Kafka08JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
}
@Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema<Row> getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
+ protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer08.class;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index f65a02d..a4d2661 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
/**
- * Creates {@link KafkaTableSink} for Kafka 0.9 .
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with fixed partition assignment.
+ *
+ * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+ * <ul>
+ * <li>If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.</li>
+ * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.</li>
+ * </ul>
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka09JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
}
/**
- * Creates {@link KafkaTableSink} for Kafka 0.9 .
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index c52b4ca..58f2b05 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
protected KafkaTableSink createTableSink(
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka09JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic,
- Properties properties,
- SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
-
- return kafkaProducer;
- }
- };
+ FlinkKafkaPartitioner<Row> partitioner) {
+
+ return new Kafka09JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
}
@Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema<Row> getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
+ protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer09.class;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index f354dad..6665dbd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
@@ -42,7 +43,7 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
}
@Override
- protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
- return new JsonRowSerializationSchema(fieldNames);
+ protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
+ return new JsonRowSerializationSchema(rowSchema);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index cac71dc..f42827e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -77,10 +77,10 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
/**
* Create serialization schema for converting table rows into bytes.
*
- * @param fieldNames Field names in table rows.
+ * @param rowSchema the schema of the row to serialize.
* @return Instance of serialization schema
*/
- protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
+ protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
/**
* Create a deep copy of this sink.
@@ -92,6 +92,8 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@Override
public void emitDataStream(DataStream<Row> dataStream) {
FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
+ // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
+ kafkaProducer.setFlushOnCheckpoint(true);
dataStream.addSink(kafkaProducer);
}
@@ -116,7 +118,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
- copy.serializationSchema = createSerializationSchema(fieldNames);
+
+ RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+ copy.serializationSchema = createSerializationSchema(rowSchema);
return copy;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 5ece193..36d3137 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -18,6 +18,9 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
/**
* Creates a JSON serialization schema for the given fields and types.
*
- * @param fieldNames Names of JSON fields to parse.
+ * @param rowSchema The schema of the rows to encode.
*/
- public JsonRowSerializationSchema(String[] fieldNames) {
- this.fieldNames = Preconditions.checkNotNull(fieldNames);
+ public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+ Preconditions.checkNotNull(rowSchema);
+ String[] fieldNames = rowSchema.getFieldNames();
+ TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+ // check that no field is composite
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i] instanceof CompositeType) {
+ throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
+ "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
+ }
+ }
+
+ this.fieldNames = fieldNames;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 43bde35..70140a6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.table.api.Types;
@@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest {
@Test
public void testRowSerialization() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() };
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
+ );
+
Row row = new Row(3);
row.setField(0, 1);
row.setField(1, true);
row.setField(2, "str");
- Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
+ Row resultRow = serializeAndDeserialize(rowSchema, row);
assertEqualRows(row, resultRow);
}
@Test
public void testSerializationOfTwoRows() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- TypeInformation<Row> row = Types.ROW(
- fieldNames,
- new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
);
+
Row row1 = new Row(3);
row1.setField(0, 1);
row1.setField(1, true);
row1.setField(2, "str");
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row);
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+ JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
byte[] bytes = serializationSchema.serialize(row1);
assertEqualRows(row1, deserializationSchema.deserialize(bytes));
@@ -79,19 +83,33 @@ public class JsonRowSerializationSchemaTest {
new JsonRowSerializationSchema(null);
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testRejectNestedSchema() {
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.ROW(Types.INT(), Types.DOUBLE())},
+ new String[] {"f1", "f2", "f3"}
+ );
+
+ new JsonRowSerializationSchema(rowSchema);
+ }
+
@Test(expected = IllegalStateException.class)
public void testSerializeRowWithInvalidNumberOfFields() {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
+ );
+
Row row = new Row(1);
row.setField(0, 1);
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
serializationSchema.serialize(row);
}
- private Row serializeAndDeserialize(String[] fieldNames, TypeInformation<?>[] fieldTypes, Row row) throws IOException {
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes));
+ private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) throws IOException {
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+ JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
byte[] bytes = serializationSchema.serialize(row);
return deserializationSchema.deserialize(bytes);
http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 3138152..ac5259e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
@@ -46,32 +45,27 @@ import static org.mockito.Mockito.verify;
public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
- protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+ private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
- @SuppressWarnings("unchecked")
- private final FlinkKafkaProducerBase<Row> producer = new FlinkKafkaProducerBase<Row>(
- TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
-
- @Override
- protected void flush() {}
- };
- @Test
@SuppressWarnings("unchecked")
+ @Test
public void testKafkaTableSink() throws Exception {
DataStream dataStream = mock(DataStream.class);
KafkaTableSink kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);
- verify(dataStream).addSink(eq(producer));
+ // verify correct producer class
+ verify(dataStream).addSink(any(getProducerClass()));
+ // verify correctly configured producer
verify(kafkaTableSink).createKafkaProducer(
eq(TOPIC),
eq(PROPERTIES),
- any(getSerializationSchema().getClass()),
+ any(getSerializationSchemaClass()),
eq(PARTITIONER));
}
@@ -86,13 +80,17 @@ public abstract class KafkaTableSinkTestBase {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
- protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
- FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+ protected abstract KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner);
+
+ protected abstract Class<? extends SerializationSchema<Row>> getSerializationSchemaClass();
- protected abstract SerializationSchema<Row> getSerializationSchema();
+ protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer);
+ return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
}
private static Properties createSinkProperties() {