You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/06/23 08:14:54 UTC

[2/2] flink git commit: [FLINK-3872] [table, connector-kafka] Add KafkaTableSource

[FLINK-3872] [table, connector-kafka] Add KafkaTableSource

This closes #2069.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479be148
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479be148
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479be148

Branch: refs/heads/master
Commit: 479be148ff3ee9369a5f05d7db589605ee088d18
Parents: 3ac080f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jun 3 15:24:22 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Jun 23 10:14:20 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |  43 +++++
 .../flink-connector-kafka-0.8/pom.xml           |  11 +-
 .../kafka/Kafka08JsonTableSource.java           |  71 ++++++++
 .../connectors/kafka/Kafka08TableSource.java    |  75 +++++++++
 .../connectors/kafka/Kafka08ITCase.java         |  66 +++++++-
 .../flink-connector-kafka-0.9/pom.xml           |  10 +-
 .../kafka/Kafka09JsonTableSource.java           |  71 ++++++++
 .../connectors/kafka/Kafka09TableSource.java    |  75 +++++++++
 .../connectors/kafka/Kafka09ITCase.java         |  69 ++++++++
 .../connectors/kafka/KafkaJsonTableSource.java  |  97 +++++++++++
 .../connectors/kafka/KafkaTableSource.java      | 162 +++++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java | 128 ++++++++++++++-
 12 files changed, 870 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 35caa08..f0a6528 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -207,6 +207,49 @@ A `TableSource` can provide access to data stored in various storage systems suc
 
 Currently, Flink only provides a `CsvTableSource` to read CSV files. A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface.
 
+### Available Table Sources
+
+| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
+| `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files with up to 25 fields.
+| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data.
+| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data.
+
+All source that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
+
+#### KafkaJsonTableSource
+
+To use the Kafka JSON source, you have to add the Kafka connector dependency to your project:
+
+  - `flink-connector-kafka-0.8` for Kafka 0.8, and
+  - `flink-connector-kafka-0.9` for Kafka 0.9, respectively.
+
+You can then create the source as follows (example for Kafka 0.8):
+
+```java
+// The JSON field names and types
+String[] fieldNames =  new String[] { "id", "name", "score"};
+Class<?>[] fieldTypes = new Class<?>[] { Integer.class, String.class, Double.class };
+
+KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
+    kafkaTopic,
+    kafkaProperties,
+    fieldNames,
+    fieldTypes);
+```
+
+By default, a missing JSON field does not fail the source. You can configure this via:
+
+```java
+// Fail on missing JSON field
+tableSource.setFailOnMissingField(true);
+```
+
+You can work with the Table as explained in the rest of the Table API guide:
+
+```java
+tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
+Table result = tableEnvironment.ingest("kafka-source");
+```
 
 Table API
 ----------

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index b2701c1..3557f1c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -55,7 +55,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -64,6 +63,16 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_${scala.binary.version}</artifactId>
 			<version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
new file mode 100644
index 0000000..63bb57e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08JsonTableSource extends KafkaJsonTableSource {
+
+	/**
+	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka08JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka08JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
new file mode 100644
index 0000000..8f51237
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08TableSource extends KafkaTableSource {
+
+	/**
+	 * Creates a Kafka 0.8 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka08TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.8 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka08TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 530c032..b393e5b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -18,8 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
-
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,6 +32,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -360,4 +362,66 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		curatorFramework.close();
 	}
+
+	@Test
+	public void testJsonTableSource() throws Exception {
+		String topic = UUID.randomUUID().toString();
+
+		// Names and types are determined in the actual test method of the
+		// base test class.
+		Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+				topic,
+				standardProps,
+				new String[] {
+						"long",
+						"string",
+						"boolean",
+						"double",
+						"missing-field"},
+				new TypeInformation<?>[] {
+						BasicTypeInfo.LONG_TYPE_INFO,
+						BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.BOOLEAN_TYPE_INFO,
+						BasicTypeInfo.DOUBLE_TYPE_INFO,
+						BasicTypeInfo.LONG_TYPE_INFO });
+
+		// Don't fail on missing field, but set to null (default)
+		tableSource.setFailOnMissingField(false);
+
+		runJsonTableSource(topic, tableSource);
+	}
+
+	@Test
+	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
+		String topic = UUID.randomUUID().toString();
+
+		// Names and types are determined in the actual test method of the
+		// base test class.
+		Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+				topic,
+				standardProps,
+				new String[] {
+						"long",
+						"string",
+						"boolean",
+						"double",
+						"missing-field"},
+				new TypeInformation<?>[] {
+						BasicTypeInfo.LONG_TYPE_INFO,
+						BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.BOOLEAN_TYPE_INFO,
+						BasicTypeInfo.DOUBLE_TYPE_INFO,
+						BasicTypeInfo.LONG_TYPE_INFO });
+
+		// Don't fail on missing field, but set to null (default)
+		tableSource.setFailOnMissingField(true);
+
+		try {
+			runJsonTableSource(topic, tableSource);
+			fail("Did not throw expected Exception");
+		} catch (Exception e) {
+			Throwable rootCause = e.getCause().getCause().getCause();
+			assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index e45a1d0..8feadb5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -70,7 +70,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
 			<version>${kafka.version}</version>
@@ -99,6 +98,15 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
new file mode 100644
index 0000000..975ef58
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09JsonTableSource extends KafkaJsonTableSource {
+
+	/**
+	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka09JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	public Kafka09JsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
new file mode 100644
index 0000000..03b5040
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09TableSource extends KafkaTableSource {
+
+	/**
+	 * Creates a Kafka 0.9 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka09TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a Kafka 0.9 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	public Kafka09TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index afb0056..ef64171 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -17,8 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
@@ -115,4 +122,66 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runMetricsAndEndOfStreamTest();
 	}
 
+	@Test
+	public void testJsonTableSource() throws Exception {
+		String topic = UUID.randomUUID().toString();
+
+		// Names and types are determined in the actual test method of the
+		// base test class.
+		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+				topic,
+				standardProps,
+				new String[] {
+						"long",
+						"string",
+						"boolean",
+						"double",
+						"missing-field"},
+				new TypeInformation<?>[] {
+						BasicTypeInfo.LONG_TYPE_INFO,
+						BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.BOOLEAN_TYPE_INFO,
+						BasicTypeInfo.DOUBLE_TYPE_INFO,
+						BasicTypeInfo.LONG_TYPE_INFO });
+
+		// Don't fail on missing field, but set to null (default)
+		tableSource.setFailOnMissingField(false);
+
+		runJsonTableSource(topic, tableSource);
+	}
+
+	@Test
+	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
+		String topic = UUID.randomUUID().toString();
+
+		// Names and types are determined in the actual test method of the
+		// base test class.
+		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+				topic,
+				standardProps,
+				new String[] {
+						"long",
+						"string",
+						"boolean",
+						"double",
+						"missing-field"},
+				new TypeInformation<?>[] {
+						BasicTypeInfo.LONG_TYPE_INFO,
+						BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.BOOLEAN_TYPE_INFO,
+						BasicTypeInfo.DOUBLE_TYPE_INFO,
+						BasicTypeInfo.LONG_TYPE_INFO });
+
+		// Don't fail on missing field, but set to null (default)
+		tableSource.setFailOnMissingField(true);
+
+		try {
+			runJsonTableSource(topic, tableSource);
+			fail("Did not throw expected Exception");
+		} catch (Exception e) {
+			Throwable rootCause = e.getCause().getCause().getCause();
+			assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
new file mode 100644
index 0000000..f145509
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka JSON {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * <p>The field names are used to parse the JSON file and so are the types.
+ */
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
+
+	/**
+	 * Creates a generic Kafka JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	KafkaJsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Creates a generic Kafka JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param fieldNames Row field names.
+	 * @param fieldTypes Row field types.
+	 */
+	KafkaJsonTableSource(
+			String topic,
+			Properties properties,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+	}
+
+	/**
+	 * Configures the failure behaviour if a JSON field is missing.
+	 *
+	 * <p>By default, a missing field is ignored and the field is set to null.
+	 *
+	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 */
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
+		deserializationSchema.setFailOnMissingField(failOnMissingField);
+	}
+
+	private static JsonRowDeserializationSchema createDeserializationSchema(
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+	}
+
+	private static JsonRowDeserializationSchema createDeserializationSchema(
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
new file mode 100644
index 0000000..e43760b
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+abstract class KafkaTableSource implements StreamTableSource<Row> {
+
+	/** The Kafka topic to consume. */
+	private final String topic;
+
+	/** Properties for the Kafka consumer. */
+	private final Properties properties;
+
+	/** Deserialization schema to use for Kafka records. */
+	private final DeserializationSchema<Row> deserializationSchema;
+
+	/** Row field names. */
+	private final String[] fieldNames;
+
+	/** Row field types. */
+	private final TypeInformation<?>[] fieldTypes;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	KafkaTableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			Class<?>[] fieldTypes) {
+
+		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param fieldNames            Row field names.
+	 * @param fieldTypes            Row field types.
+	 */
+	KafkaTableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes) {
+
+		this.topic = Preconditions.checkNotNull(topic, "Topic");
+		this.properties = Preconditions.checkNotNull(properties, "Properties");
+		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
+		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+				"Number of provided field names and types does not match.");
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
+		DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
+		return kafkaSource;
+	}
+
+	@Override
+	public int getNumberOfFields() {
+		return fieldNames.length;
+	}
+
+	@Override
+	public String[] getFieldsNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		return new RowTypeInfo(fieldTypes, fieldNames);
+	}
+
+	/**
+	 * Returns the version-specific Kafka consumer.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema);
+
+	/**
+	 * Returns the deserialization schema.
+	 *
+	 * @return The deserialization schema
+	 */
+	protected DeserializationSchema<Row> getDeserializationSchema() {
+		return deserializationSchema;
+	}
+
+	/**
+	 * Creates TypeInformation array for an array of Classes.
+	 */
+	private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+		TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+		}
+		return typeInfos;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 660f24c..220f061 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -25,7 +27,6 @@ import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.server.KafkaServer;
-
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -38,9 +39,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.table.StreamTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -74,23 +78,21 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
-
 import org.junit.Before;
 import org.junit.Rule;
 
@@ -107,6 +109,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -736,6 +739,121 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
+	/**
+	 * Runs a table source test with JSON data.
+	 *
+	 * The table source needs to parse the following JSON fields:
+	 * - "long" -> number
+	 * - "string" -> "string"
+	 * - "boolean" -> true|false
+	 * - "double" -> fraction
+	 */
+	public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
+		final ObjectMapper mapper = new ObjectMapper();
+
+		final int numElements = 1024;
+		final long[] longs = new long[numElements];
+		final String[] strings = new String[numElements];
+		final boolean[] booleans = new boolean[numElements];
+		final double[] doubles = new double[numElements];
+
+		final byte[][] serializedJson = new byte[numElements][];
+
+		ThreadLocalRandom random = ThreadLocalRandom.current();
+
+		for (int i = 0; i < numElements; i++) {
+			longs[i] = random.nextLong();
+			strings[i] = Integer.toHexString(random.nextInt());
+			booleans[i] = random.nextBoolean();
+			doubles[i] = random.nextDouble();
+
+			ObjectNode entry = mapper.createObjectNode();
+			entry.put("long", longs[i]);
+			entry.put("string", strings[i]);
+			entry.put("boolean", booleans[i]);
+			entry.put("double", doubles[i]);
+
+			serializedJson[i] = mapper.writeValueAsBytes(entry);
+		}
+
+		// Produce serialized JSON data
+		createTestTopic(topic, 1, 1);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+
+		env.addSource(new SourceFunction<byte[]>() {
+			@Override
+			public void run(SourceContext<byte[]> ctx) throws Exception {
+				for (int i = 0; i < numElements; i++) {
+					ctx.collect(serializedJson[i]);
+				}
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).addSink(kafkaServer.getProducer(
+				topic,
+				new ByteArraySerializationSchema(),
+				standardProps,
+				null));
+
+		// Execute blocks
+		env.execute();
+
+		// Register as table source
+		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+		tableEnvironment.registerTableSource("kafka", kafkaTableSource);
+
+		Table result = tableEnvironment.ingest("kafka");
+
+		tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
+
+			int i = 0;
+
+			@Override
+			public void invoke(Row value) throws Exception {
+				assertEquals(5, value.productArity());
+				assertEquals(longs[i], value.productElement(0));
+				assertEquals(strings[i], value.productElement(1));
+				assertEquals(booleans[i], value.productElement(2));
+				assertEquals(doubles[i], value.productElement(3));
+				assertNull(value.productElement(4));
+
+				if (i == numElements-1) {
+					throw new SuccessException();
+				} else {
+					i++;
+				}
+			}
+		});
+
+		tryExecutePropagateExceptions(env, "KafkaTableSource");
+	}
+
+	/**
+	 * Serialization scheme forwarding byte[] records.
+	 */
+	private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
+
+		@Override
+		public byte[] serializeKey(byte[] element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(byte[] element) {
+			return element;
+		}
+
+		@Override
+		public String getTargetTopic(byte[] element) {
+			return null;
+		}
+	}
+
 	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
 			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {