You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/06/23 08:14:54 UTC
[2/2] flink git commit: [FLINK-3872] [table,
connector-kafka] Add KafkaTableSource
[FLINK-3872] [table, connector-kafka] Add KafkaTableSource
This closes #2069.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479be148
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479be148
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479be148
Branch: refs/heads/master
Commit: 479be148ff3ee9369a5f05d7db589605ee088d18
Parents: 3ac080f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jun 3 15:24:22 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Jun 23 10:14:20 2016 +0200
----------------------------------------------------------------------
docs/apis/table.md | 43 +++++
.../flink-connector-kafka-0.8/pom.xml | 11 +-
.../kafka/Kafka08JsonTableSource.java | 71 ++++++++
.../connectors/kafka/Kafka08TableSource.java | 75 +++++++++
.../connectors/kafka/Kafka08ITCase.java | 66 +++++++-
.../flink-connector-kafka-0.9/pom.xml | 10 +-
.../kafka/Kafka09JsonTableSource.java | 71 ++++++++
.../connectors/kafka/Kafka09TableSource.java | 75 +++++++++
.../connectors/kafka/Kafka09ITCase.java | 69 ++++++++
.../connectors/kafka/KafkaJsonTableSource.java | 97 +++++++++++
.../connectors/kafka/KafkaTableSource.java | 162 +++++++++++++++++++
.../connectors/kafka/KafkaConsumerTestBase.java | 128 ++++++++++++++-
12 files changed, 870 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 35caa08..f0a6528 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -207,6 +207,49 @@ A `TableSource` can provide access to data stored in various storage systems suc
Currently, Flink only provides a `CsvTableSource` to read CSV files. A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface.
+### Available Table Sources
+
+| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
+| `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files with up to 25 fields.
+| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data.
+| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data.
+
+All source that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
+
+#### KafkaJsonTableSource
+
+To use the Kafka JSON source, you have to add the Kafka connector dependency to your project:
+
+ - `flink-connector-kafka-0.8` for Kafka 0.8, and
+ - `flink-connector-kafka-0.9` for Kafka 0.9, respectively.
+
+You can then create the source as follows (example for Kafka 0.8):
+
+```java
+// The JSON field names and types
+String[] fieldNames = new String[] { "id", "name", "score"};
+Class<?>[] fieldTypes = new Class<?>[] { Integer.class, String.class, Double.class };
+
+KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
+ kafkaTopic,
+ kafkaProperties,
+ fieldNames,
+ fieldTypes);
+```
+
+By default, a missing JSON field does not fail the source. You can configure this via:
+
+```java
+// Fail on missing JSON field
+tableSource.setFailOnMissingField(true);
+```
+
+You can work with the Table as explained in the rest of the Table API guide:
+
+```java
+tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
+Table result = tableEnvironment.ingest("kafka-source");
+```
Table API
----------
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index b2701c1..3557f1c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -55,7 +55,6 @@ under the License.
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
@@ -64,6 +63,16 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project,
+ won't depend on flink-table. -->
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
new file mode 100644
index 0000000..63bb57e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08JsonTableSource extends KafkaJsonTableSource {
+
+ /**
+ * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka08JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka08JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
new file mode 100644
index 0000000..8f51237
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08TableSource extends KafkaTableSource {
+
+ /**
+ * Creates a Kafka 0.8 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka08TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.8 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka08TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 530c032..b393e5b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.curator.framework.CuratorFramework;
-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -360,4 +362,66 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
curatorFramework.close();
}
+
+ @Test
+ public void testJsonTableSource() throws Exception {
+ String topic = UUID.randomUUID().toString();
+
+ // Names and types are determined in the actual test method of the
+ // base test class.
+ Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+ topic,
+ standardProps,
+ new String[] {
+ "long",
+ "string",
+ "boolean",
+ "double",
+ "missing-field"},
+ new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO });
+
+ // Don't fail on missing field, but set to null (default)
+ tableSource.setFailOnMissingField(false);
+
+ runJsonTableSource(topic, tableSource);
+ }
+
+ @Test
+ public void testJsonTableSourceWithFailOnMissingField() throws Exception {
+ String topic = UUID.randomUUID().toString();
+
+ // Names and types are determined in the actual test method of the
+ // base test class.
+ Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+ topic,
+ standardProps,
+ new String[] {
+ "long",
+ "string",
+ "boolean",
+ "double",
+ "missing-field"},
+ new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO });
+
+ // Don't fail on missing field, but set to null (default)
+ tableSource.setFailOnMissingField(true);
+
+ try {
+ runJsonTableSource(topic, tableSource);
+ fail("Did not throw expected Exception");
+ } catch (Exception e) {
+ Throwable rootCause = e.getCause().getCause().getCause();
+ assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index e45a1d0..8feadb5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -70,7 +70,6 @@ under the License.
</dependency>
<dependency>
-
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
@@ -99,6 +98,15 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project,
+ won't depend on flink-table. -->
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
new file mode 100644
index 0000000..975ef58
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09JsonTableSource extends KafkaJsonTableSource {
+
+ /**
+ * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09JsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
new file mode 100644
index 0000000..03b5040
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09TableSource extends KafkaTableSource {
+
+ /**
+ * Creates a Kafka 0.9 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a Kafka 0.9 {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ public Kafka09TableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
+ }
+
+ @Override
+ FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+ return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index afb0056..ef64171 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -17,8 +17,15 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.junit.Test;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class Kafka09ITCase extends KafkaConsumerTestBase {
@@ -115,4 +122,66 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
runMetricsAndEndOfStreamTest();
}
+ @Test
+ public void testJsonTableSource() throws Exception {
+ String topic = UUID.randomUUID().toString();
+
+ // Names and types are determined in the actual test method of the
+ // base test class.
+ Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+ topic,
+ standardProps,
+ new String[] {
+ "long",
+ "string",
+ "boolean",
+ "double",
+ "missing-field"},
+ new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO });
+
+ // Don't fail on missing field, but set to null (default)
+ tableSource.setFailOnMissingField(false);
+
+ runJsonTableSource(topic, tableSource);
+ }
+
+ @Test
+ public void testJsonTableSourceWithFailOnMissingField() throws Exception {
+ String topic = UUID.randomUUID().toString();
+
+ // Names and types are determined in the actual test method of the
+ // base test class.
+ Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+ topic,
+ standardProps,
+ new String[] {
+ "long",
+ "string",
+ "boolean",
+ "double",
+ "missing-field"},
+ new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO });
+
+ // Don't fail on missing field, but set to null (default)
+ tableSource.setFailOnMissingField(true);
+
+ try {
+ runJsonTableSource(topic, tableSource);
+ fail("Did not throw expected Exception");
+ } catch (Exception e) {
+ Throwable rootCause = e.getCause().getCause().getCause();
+ assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
new file mode 100644
index 0000000..f145509
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka JSON {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * <p>The field names are used to parse the JSON file and so are the types.
+ */
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
+
+ /**
+ * Creates a generic Kafka JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaJsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+ }
+
+ /**
+ * Creates a generic Kafka JSON {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaJsonTableSource(
+ String topic,
+ Properties properties,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+ }
+
+ /**
+ * Configures the failure behaviour if a JSON field is missing.
+ *
+ * <p>By default, a missing field is ignored and the field is set to null.
+ *
+ * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+ */
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
+ deserializationSchema.setFailOnMissingField(failOnMissingField);
+ }
+
+ private static JsonRowDeserializationSchema createDeserializationSchema(
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+ }
+
+ private static JsonRowDeserializationSchema createDeserializationSchema(
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
new file mode 100644
index 0000000..e43760b
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+abstract class KafkaTableSource implements StreamTableSource<Row> {
+
+ /** The Kafka topic to consume. */
+ private final String topic;
+
+ /** Properties for the Kafka consumer. */
+ private final Properties properties;
+
+ /** Deserialization schema to use for Kafka records. */
+ private final DeserializationSchema<Row> deserializationSchema;
+
+ /** Row field names. */
+ private final String[] fieldNames;
+
+ /** Row field types. */
+ private final TypeInformation<?>[] fieldTypes;
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaTableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ Class<?>[] fieldTypes) {
+
+ this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
+ }
+
+ /**
+ * Creates a generic Kafka {@link StreamTableSource}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @param fieldNames Row field names.
+ * @param fieldTypes Row field types.
+ */
+ KafkaTableSource(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes) {
+
+ this.topic = Preconditions.checkNotNull(topic, "Topic");
+ this.properties = Preconditions.checkNotNull(properties, "Properties");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
+ this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
+
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ }
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+ // Version-specific Kafka consumer
+ FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
+ DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
+ return kafkaSource;
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return fieldNames.length;
+ }
+
+ @Override
+ public String[] getFieldsNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public TypeInformation<Row> getReturnType() {
+ return new RowTypeInfo(fieldTypes, fieldNames);
+ }
+
+ /**
+ * Returns the version-specific Kafka consumer.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema to use for Kafka records.
+ * @return The version-specific Kafka consumer
+ */
+ abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema);
+
+ /**
+ * Returns the deserialization schema.
+ *
+ * @return The deserialization schema
+ */
+ protected DeserializationSchema<Row> getDeserializationSchema() {
+ return deserializationSchema;
+ }
+
+ /**
+ * Creates TypeInformation array for an array of Classes.
+ */
+ private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+ TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+ }
+ return typeInfos;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479be148/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 660f24c..220f061 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
@@ -25,7 +27,6 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.server.KafkaServer;
-
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
@@ -38,9 +39,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
@@ -74,23 +78,21 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
-
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
-
import org.junit.Before;
import org.junit.Rule;
@@ -107,6 +109,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -736,6 +739,121 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
+ /**
+ * Runs a table source test with JSON data.
+ *
+ * The table source needs to parse the following JSON fields:
+ * - "long" -> number
+ * - "string" -> "string"
+ * - "boolean" -> true|false
+ * - "double" -> fraction
+ */
+ public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ final int numElements = 1024;
+ final long[] longs = new long[numElements];
+ final String[] strings = new String[numElements];
+ final boolean[] booleans = new boolean[numElements];
+ final double[] doubles = new double[numElements];
+
+ final byte[][] serializedJson = new byte[numElements][];
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ for (int i = 0; i < numElements; i++) {
+ longs[i] = random.nextLong();
+ strings[i] = Integer.toHexString(random.nextInt());
+ booleans[i] = random.nextBoolean();
+ doubles[i] = random.nextDouble();
+
+ ObjectNode entry = mapper.createObjectNode();
+ entry.put("long", longs[i]);
+ entry.put("string", strings[i]);
+ entry.put("boolean", booleans[i]);
+ entry.put("double", doubles[i]);
+
+ serializedJson[i] = mapper.writeValueAsBytes(entry);
+ }
+
+ // Produce serialized JSON data
+ createTestTopic(topic, 1, 1);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
+ env.addSource(new SourceFunction<byte[]>() {
+ @Override
+ public void run(SourceContext<byte[]> ctx) throws Exception {
+ for (int i = 0; i < numElements; i++) {
+ ctx.collect(serializedJson[i]);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).addSink(kafkaServer.getProducer(
+ topic,
+ new ByteArraySerializationSchema(),
+ standardProps,
+ null));
+
+ // Execute blocks
+ env.execute();
+
+ // Register as table source
+ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+ tableEnvironment.registerTableSource("kafka", kafkaTableSource);
+
+ Table result = tableEnvironment.ingest("kafka");
+
+ tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
+
+ int i = 0;
+
+ @Override
+ public void invoke(Row value) throws Exception {
+ assertEquals(5, value.productArity());
+ assertEquals(longs[i], value.productElement(0));
+ assertEquals(strings[i], value.productElement(1));
+ assertEquals(booleans[i], value.productElement(2));
+ assertEquals(doubles[i], value.productElement(3));
+ assertNull(value.productElement(4));
+
+ if (i == numElements-1) {
+ throw new SuccessException();
+ } else {
+ i++;
+ }
+ }
+ });
+
+ tryExecutePropagateExceptions(env, "KafkaTableSource");
+ }
+
+ /**
+ * Serialization scheme forwarding byte[] records.
+ */
+ private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
+
+ @Override
+ public byte[] serializeKey(byte[] element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(byte[] element) {
+ return element;
+ }
+
+ @Override
+ public String getTargetTopic(byte[] element) {
+ return null;
+ }
+ }
+
private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {