You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/16 13:26:11 UTC

[2/5] flink git commit: [FLINK-8014] [table] Add Kafka010JsonTableSink.

[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa

Branch: refs/heads/master
Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd
Parents: fc3eebd
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:59:43 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 16 11:32:12 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010JsonTableSink.java | 73 ++++++++++++++++++++
 .../kafka/Kafka010JsonTableSinkTest.java        | 53 ++++++++++++++
 .../connectors/kafka/Kafka08JsonTableSink.java  | 26 ++++++-
 .../kafka/Kafka08JsonTableSinkTest.java         | 27 +++-----
 .../connectors/kafka/Kafka09JsonTableSink.java  | 26 ++++++-
 .../kafka/Kafka09JsonTableSinkTest.java         | 27 +++-----
 .../connectors/kafka/KafkaJsonTableSink.java    |  5 +-
 .../connectors/kafka/KafkaTableSink.java        | 10 ++-
 .../JsonRowSerializationSchema.java             | 22 +++++-
 .../kafka/JsonRowSerializationSchemaTest.java   | 46 ++++++++----
 .../kafka/KafkaTableSinkTestBase.java           | 30 ++++----
 11 files changed, 269 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 0000000..431ace0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+	/**
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+	 * topic with fixed partition assignment.
+	 *
+	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+	 * <ul>
+	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
+	 * sink instances will write to the same partition.</li>
+	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+	 * Kafka partitions won't receive data.</li>
+	 * </ul>
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 */
+	public Kafka010JsonTableSink(String topic, Properties properties) {
+		super(topic, properties, new FlinkFixedPartitioner<>());
+	}
+
+	/**
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+	 * topic with custom partition assignment.
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
+
+	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
+	protected Kafka010JsonTableSink createCopy() {
+		return new Kafka010JsonTableSink(topic, properties, partitioner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
new file mode 100644
index 0000000..4d805d5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSink}.
+ */
+public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+	@Override
+	protected KafkaTableSink createTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner) {
+
+		return new Kafka010JsonTableSink(topic, properties, partitioner);
+	}
+
+	@Override
+	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+		return JsonRowSerializationSchema.class;
+	}
+
+	@Override
+	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+		return FlinkKafkaProducer010.class;
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index a887048..39d5cb2 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.8.
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+	 * topic with fixed partition assignment.
+	 *
+	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+	 * <ul>
+	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
+	 * sink instances will write to the same partition.</li>
+	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+	 * Kafka partitions won't receive data.</li>
+	 * </ul>
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 */
+	public Kafka08JsonTableSink(String topic, Properties properties) {
+		super(topic, properties, new FlinkFixedPartitioner<>());
+	}
+
+	/**
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+	 * topic with custom partition assignment.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.8.
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+	 * topic with custom partition assignment.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 890fc3a..d7bb683 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 	protected KafkaTableSink createTableSink(
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka08JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
-					String topic,
-					Properties properties,
-					SerializationSchema<Row> serializationSchema,
-					FlinkKafkaPartitioner<Row> partitioner) {
-
-				return kafkaProducer;
-			}
-		};
+			FlinkKafkaPartitioner<Row> partitioner) {
+
+		return new Kafka08JsonTableSink(topic, properties, partitioner);
+	}
+
+	@Override
+	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+		return JsonRowSerializationSchema.class;
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	protected SerializationSchema<Row> getSerializationSchema() {
-		return new JsonRowSerializationSchema(FIELD_NAMES);
+	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+		return FlinkKafkaProducer08.class;
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index f65a02d..a4d2661 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.9 .
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+	 * topic with fixed partition assignment.
+	 *
+	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
+	 * <ul>
+	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
+	 * sink instances will write to the same partition.</li>
+	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
+	 * Kafka partitions won't receive data.</li>
+	 * </ul>
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 */
+	public Kafka09JsonTableSink(String topic, Properties properties) {
+		super(topic, properties, new FlinkFixedPartitioner<>());
+	}
+
+	/**
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+	 * topic with custom partition assignment.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.9 .
+	 * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+	 * topic with custom partition assignment.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index c52b4ca..58f2b05 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 	protected KafkaTableSink createTableSink(
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka09JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
-					String topic,
-					Properties properties,
-					SerializationSchema<Row> serializationSchema,
-					FlinkKafkaPartitioner<Row> partitioner) {
-
-				return kafkaProducer;
-			}
-		};
+			FlinkKafkaPartitioner<Row> partitioner) {
+
+		return new Kafka09JsonTableSink(topic, properties, partitioner);
+	}
+
+	@Override
+	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
+		return JsonRowSerializationSchema.class;
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	protected SerializationSchema<Row> getSerializationSchema() {
-		return new JsonRowSerializationSchema(FIELD_NAMES);
+	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+		return FlinkKafkaProducer09.class;
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index f354dad..6665dbd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
@@ -42,7 +43,7 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
 	}
 
 	@Override
-	protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
-		return new JsonRowSerializationSchema(fieldNames);
+	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
+		return new JsonRowSerializationSchema(rowSchema);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index cac71dc..f42827e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -77,10 +77,10 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	/**
 	 * Create serialization schema for converting table rows into bytes.
 	 *
-	 * @param fieldNames Field names in table rows.
+	 * @param rowSchema the schema of the row to serialize.
 	 * @return Instance of serialization schema
 	 */
-	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
+	protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
 
 	/**
 	 * Create a deep copy of this sink.
@@ -92,6 +92,8 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	@Override
 	public void emitDataStream(DataStream<Row> dataStream) {
 		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
+		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
+		kafkaProducer.setFlushOnCheckpoint(true);
 		dataStream.addSink(kafkaProducer);
 	}
 
@@ -116,7 +118,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 		copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
 		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
 			"Number of provided field names and types does not match.");
-		copy.serializationSchema = createSerializationSchema(fieldNames);
+
+		RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+		copy.serializationSchema = createSerializationSchema(rowSchema);
 
 		return copy;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 5ece193..36d3137 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 	/**
 	 * Creates a JSON serialization schema for the given fields and types.
 	 *
-	 * @param fieldNames Names of JSON fields to parse.
+	 * @param rowSchema The schema of the rows to encode.
 	 */
-	public JsonRowSerializationSchema(String[] fieldNames) {
-		this.fieldNames = Preconditions.checkNotNull(fieldNames);
+	public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+		Preconditions.checkNotNull(rowSchema);
+		String[] fieldNames = rowSchema.getFieldNames();
+		TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+		// check that no field is composite
+		for (int i = 0; i < fieldTypes.length; i++) {
+			if (fieldTypes[i] instanceof CompositeType) {
+				throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
+					"but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
+			}
+		}
+
+		this.fieldNames = fieldNames;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 43bde35..70140a6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.table.api.Types;
@@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest {
 
 	@Test
 	public void testRowSerialization() throws IOException {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
-		TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() };
+		RowTypeInfo rowSchema = new RowTypeInfo(
+			new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+			new String[] {"f1", "f2", "f3"}
+		);
+
 		Row row = new Row(3);
 		row.setField(0, 1);
 		row.setField(1, true);
 		row.setField(2, "str");
 
-		Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
+		Row resultRow = serializeAndDeserialize(rowSchema, row);
 		assertEqualRows(row, resultRow);
 	}
 
 	@Test
 	public void testSerializationOfTwoRows() throws IOException {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
-		TypeInformation<Row> row = Types.ROW(
-			fieldNames,
-			new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }
+		RowTypeInfo rowSchema = new RowTypeInfo(
+			new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+			new String[] {"f1", "f2", "f3"}
 		);
+
 		Row row1 = new Row(3);
 		row1.setField(0, 1);
 		row1.setField(1, true);
 		row1.setField(2, "str");
 
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row);
+		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
 
 		byte[] bytes = serializationSchema.serialize(row1);
 		assertEqualRows(row1, deserializationSchema.deserialize(bytes));
@@ -79,19 +83,33 @@ public class JsonRowSerializationSchemaTest {
 		new JsonRowSerializationSchema(null);
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testRejectNestedSchema() {
+		RowTypeInfo rowSchema = new RowTypeInfo(
+			new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.ROW(Types.INT(), Types.DOUBLE())},
+			new String[] {"f1", "f2", "f3"}
+		);
+
+		new JsonRowSerializationSchema(rowSchema);
+	}
+
 	@Test(expected = IllegalStateException.class)
 	public void testSerializeRowWithInvalidNumberOfFields() {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
+		RowTypeInfo rowSchema = new RowTypeInfo(
+			new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+			new String[] {"f1", "f2", "f3"}
+		);
+
 		Row row = new Row(1);
 		row.setField(0, 1);
 
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
+		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
 		serializationSchema.serialize(row);
 	}
 
-	private Row serializeAndDeserialize(String[] fieldNames, TypeInformation<?>[] fieldTypes, Row row) throws IOException {
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes));
+	private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) throws IOException {
+		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
 
 		byte[] bytes = serializationSchema.serialize(row);
 		return deserializationSchema.deserialize(bytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 3138152..ac5259e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 
@@ -46,32 +45,27 @@ import static org.mockito.Mockito.verify;
 public abstract class KafkaTableSinkTestBase {
 
 	private static final String TOPIC = "testTopic";
-	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+	private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
 	private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
-	@SuppressWarnings("unchecked")
-	private final FlinkKafkaProducerBase<Row> producer = new FlinkKafkaProducerBase<Row>(
-		TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
-
-		@Override
-		protected void flush() {}
-	};
 
-	@Test
 	@SuppressWarnings("unchecked")
+	@Test
 	public void testKafkaTableSink() throws Exception {
 		DataStream dataStream = mock(DataStream.class);
 
 		KafkaTableSink kafkaTableSink = spy(createTableSink());
 		kafkaTableSink.emitDataStream(dataStream);
 
-		verify(dataStream).addSink(eq(producer));
+		// verify correct producer class
+		verify(dataStream).addSink(any(getProducerClass()));
 
+		// verify correctly configured producer
 		verify(kafkaTableSink).createKafkaProducer(
 			eq(TOPIC),
 			eq(PROPERTIES),
-			any(getSerializationSchema().getClass()),
+			any(getSerializationSchemaClass()),
 			eq(PARTITIONER));
 	}
 
@@ -86,13 +80,17 @@ public abstract class KafkaTableSinkTestBase {
 		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
-	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+	protected abstract KafkaTableSink createTableSink(
+		String topic,
+		Properties properties,
+		FlinkKafkaPartitioner<Row> partitioner);
+
+	protected abstract Class<? extends SerializationSchema<Row>> getSerializationSchemaClass();
 
-	protected abstract SerializationSchema<Row> getSerializationSchema();
+	protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
 
 	private KafkaTableSink createTableSink() {
-		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer);
+		return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
 	}
 
 	private static Properties createSinkProperties() {