You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/06 09:36:51 UTC

[GitHub] pnowojski closed pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

pnowojski closed pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml
index cfc92c00131..617f1232e35 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -52,6 +52,10 @@ under the License.
 					<groupId>org.apache.kafka</groupId>
 					<artifactId>kafka_${scala.binary.version}</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka-clients</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 89228b39271..e1adab537d9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -187,6 +187,34 @@ under the License.
 			</excludes>
 		</fileSet>
 
+		<!-- copy jar files of the streaming kafka examples -->
+		<fileSet>
+			<directory>../flink-examples/flink-examples-streaming-kafka/target</directory>
+			<outputDirectory>examples/streaming</outputDirectory>
+			<fileMode>0644</fileMode>
+			<includes>
+				<include>*.jar</include>
+			</includes>
+			<excludes>
+				<exclude>flink-examples-streaming-kafka*.jar</exclude>
+				<exclude>original-*.jar</exclude>
+			</excludes>
+		</fileSet>
+
+		<!-- copy jar files of the streaming kafka 0.10 examples -->
+		<fileSet>
+			<directory>../flink-examples/flink-examples-streaming-kafka-0.10/target</directory>
+			<outputDirectory>examples/streaming</outputDirectory>
+			<fileMode>0644</fileMode>
+			<includes>
+				<include>*.jar</include>
+			</includes>
+			<excludes>
+				<exclude>flink-examples-streaming-kafka*.jar</exclude>
+				<exclude>original-*.jar</exclude>
+			</excludes>
+		</fileSet>
+
 		<!-- copy jar files of the gelly examples -->
 		<fileSet>
 			<directory>../flink-libraries/flink-gelly-examples/target</directory>
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 15b9b152f77..3e3494a491a 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -54,6 +54,7 @@ run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/
 run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
 run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh"
 run_test "Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
+run_test "Modern Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh"
 run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
 run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
 run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 2dc58f7305a..dedfe5208b1 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 ################################################################################
 
+set -e
+set -u
 set -o pipefail
 
 if [[ -z $TEST_DATA_DIR ]]; then
@@ -24,15 +26,19 @@ if [[ -z $TEST_DATA_DIR ]]; then
   exit 1
 fi
 
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
-CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0
+KAFKA_VERSION="$1"
+CONFLUENT_VERSION="$2"
+CONFLUENT_MAJOR_VERSION="$3"
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-$KAFKA_VERSION
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-$CONFLUENT_VERSION
 SCHEMA_REGISTRY_PORT=8082
 SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
 
 function setup_kafka_dist {
   # download Kafka
   mkdir -p $TEST_DATA_DIR
-  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
+  KAFKA_URL="https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.11-$KAFKA_VERSION.tgz"
   echo "Downloading Kafka from $KAFKA_URL"
   curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
 
@@ -46,7 +52,7 @@ function setup_kafka_dist {
 function setup_confluent_dist {
   # download confluent
   mkdir -p $TEST_DATA_DIR
-  CONFLUENT_URL="http://packages.confluent.io/archive/3.2/confluent-oss-3.2.0-2.11.tar.gz"
+  CONFLUENT_URL="http://packages.confluent.io/archive/$CONFLUENT_MAJOR_VERSION/confluent-oss-$CONFLUENT_VERSION-2.11.tar.gz"
   echo "Downloading confluent from $CONFLUENT_URL"
   curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz
 
@@ -76,13 +82,15 @@ function stop_kafka_cluster {
   $KAFKA_DIR/bin/kafka-server-stop.sh
   $KAFKA_DIR/bin/zookeeper-server-stop.sh
 
-  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
+  # Terminate Kafka process if it still exists
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}'|| echo "")
 
   if [ ! -z "$PIDS" ]; then
     kill -s TERM $PIDS
   fi
 
-  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
+  # Terminate QuorumPeerMain process if it still exists
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}'|| echo "")
 
   if [ ! -z "$PIDS" ]; then
     kill -s TERM $PIDS
@@ -120,14 +128,7 @@ function get_partition_end_offset {
   local topic=$1
   local partition=$2
 
-  # first, use the console consumer to produce a dummy consumer group
-  read_messages_from_kafka 0 $topic dummy-consumer
-
-  # then use the consumer offset utility to get the LOG_END_OFFSET value for the specified partition
-  $KAFKA_DIR/bin/kafka-consumer-groups.sh --describe --group dummy-consumer --bootstrap-server localhost:9092 2> /dev/null \
-    | grep "$topic \+$partition" \
-    | tr -s " " \
-    | cut -d " " -f 4
+  $KAFKA_DIR/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $topic --partitions $partition --time -1 | cut -d ":" -f 3
 }
 
 function start_confluent_schema_registry {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
new file mode 100755
index 00000000000..c5cdfde3dc0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+set -u
+set -o pipefail
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh 2.0.0 5.0.0 5.0
+
+source "$(dirname "$0")"/test_streaming_kafka_common.sh $FLINK_DIR/examples/streaming/KafkaExample.jar
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index c9cc19db71d..ecd651448a9 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -18,96 +18,7 @@
 ################################################################################
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh
+source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2
 
-setup_kafka_dist
-start_kafka_cluster
+source "$(dirname "$0")"/test_streaming_kafka_common.sh $FLINK_DIR/examples/streaming/Kafka010Example.jar
 
-# modify configuration to have enough slots
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
-sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
-
-start_cluster
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_kafka_cluster
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
-
-# create the required topics
-create_kafka_topic 1 1 test-input
-create_kafka_topic 1 1 test-output
-
-# run the Flink job (detached mode)
-$FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
-  --input-topic test-input --output-topic test-output \
-  --prefix=PREFIX \
-  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
-  --flink.partition-discovery.interval-millis 1000
-
-function verify_output {
-  local expected=$(printf $1)
-
-  if [[ "$2" != "$expected" ]]; then
-    echo "Output from Flink program does not match expected output."
-    echo -e "EXPECTED FOR KEY: --$expected--"
-    echo -e "ACTUAL: --$2--"
-    exit 1
-  fi
-}
-
-echo "Sending messages to Kafka topic [test-input] ..."
-# send some data to Kafka
-send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
-
-echo "Verifying messages from Kafka topic [test-output] ..."
-
-KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
-
-# check all keys; make sure we have actual newlines in the string, not "\n"
-verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
-verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
-verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
-
-# now, we add a new partition to the topic
-echo "Repartitioning Kafka topic [test-input] ..."
-modify_num_partitions test-input 2
-
-if (( $(get_num_partitions test-input) != 2 )); then
-  echo "Failed adding a partition to test-input topic."
-  exit 1
-fi
-
-# send some more messages to Kafka
-echo "Sending more messages to Kafka topic [test-input] ..."
-send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
-
-# verify that our assumption that the new partition actually has written messages is correct
-if (( $(get_partition_end_offset test-input 1) == 0 )); then
-  echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
-  exit 1
-fi
-
-# all new messages should have been consumed, and has produced correct output
-echo "Verifying messages from Kafka topic [test-output] ..."
-
-KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
-KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
-
-verify_output "elephant,27,64213" "$KEY_1_MSGS"
-verify_output "squirrel,52,66413" "$KEY_2_MSGS"
-verify_output "bee,18,65647" "$KEY_3_MSGS"
-verify_output "giraffe,9,65555" "$KEY_4_MSGS"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh
new file mode 100644
index 00000000000..ff3adc158c7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka_common.sh
@@ -0,0 +1,117 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+set -u
+set -o pipefail
+
+KAFKA_EXAMPLE_JAR="$1"
+
+setup_kafka_dist
+start_kafka_cluster
+
+# modify configuration to have enough slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
+
+start_cluster
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+# create the required topics
+create_kafka_topic 1 1 test-input
+create_kafka_topic 1 1 test-output
+
+# run the Flink job (detached mode)
+$FLINK_DIR/bin/flink run -d $KAFKA_EXAMPLE_JAR \
+  --input-topic test-input --output-topic test-output \
+  --prefix=PREFIX \
+  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
+  --transaction.timeout.ms 900000 \
+  --flink.partition-discovery.interval-millis 1000
+
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED FOR KEY: --$expected--"
+    echo -e "ACTUAL: --$2--"
+    exit 1
+  fi
+}
+
+echo "Sending messages to Kafka topic [test-input] ..."
+# send some data to Kafka
+send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
+
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
+
+# check all keys; make sure we have actual newlines in the string, not "\n"
+verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
+verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
+verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
+
+# now, we add a new partition to the topic
+echo "Repartitioning Kafka topic [test-input] ..."
+modify_num_partitions test-input 2
+
+if (( $(get_num_partitions test-input) != 2 )); then
+  echo "Failed adding a partition to test-input topic."
+  exit 1
+fi
+
+# send some more messages to Kafka
+echo "Sending more messages to Kafka topic [test-input] ..."
+send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
+
+# verify that our assumption that the new partition actually has written messages is correct
+if (( $(get_partition_end_offset test-input 1) == 0 )); then
+  echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
+  exit 1
+fi
+
+# all new messages should have been consumed, and has produced correct output
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
+KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
+
+verify_output "elephant,27,64213" "$KEY_1_MSGS"
+verify_output "squirrel,52,66413" "$KEY_2_MSGS"
+verify_output "bee,18,65647" "$KEY_3_MSGS"
+verify_output "giraffe,9,65555" "$KEY_4_MSGS"
diff --git a/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml b/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml
new file mode 100644
index 00000000000..ced3919da3e
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-0.10/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-examples</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.8-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-examples-streaming-kafka-0.10</artifactId>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming-kafka-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Use the shade plugin to build a fat jar for the kafka connector test -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>fat-jar-kafka-example</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass>
+								</transformer>
+							</transformers>
+							<finalName>Kafka010Example</finalName>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
new file mode 100644
index 00000000000..2df1f5d8496
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+import org.apache.flink.streaming.examples.kafka.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEvent;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEventSchema;
+import org.apache.flink.streaming.examples.kafka.base.KafkaExampleUtil;
+import org.apache.flink.streaming.examples.kafka.base.RollingAdditionMapper;
+
+/**
+ * A simple example that shows how to read from and write to Kafka. This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
+ * perform a rolling addition on each key for which the results are written back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ * 	--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class Kafka010Example {
+
+	public static void main(String[] args) throws Exception {
+		// parse input arguments
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+		StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+		DataStream<KafkaEvent> input = env
+				.addSource(
+					new FlinkKafkaConsumer010<>(
+						parameterTool.getRequired("input-topic"),
+						new KafkaEventSchema(),
+						parameterTool.getProperties())
+					.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
+				.keyBy("word")
+				.map(new RollingAdditionMapper());
+
+		input.addSink(
+				new FlinkKafkaProducer010<>(
+						parameterTool.getRequired("output-topic"),
+						new KafkaEventSchema(),
+						parameterTool.getProperties()));
+
+		env.execute("Kafka 0.10 Example");
+	}
+
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
similarity index 97%
rename from flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
rename to flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
index 9f4fdc4c294..c2ea5617a68 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
+++ b/flink-examples/flink-examples-streaming-kafka-0.10/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.scala.examples.kafka
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.common.serialization.SimpleStringSchema
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
 
 /**
diff --git a/flink-examples/flink-examples-streaming-kafka-base/pom.xml b/flink-examples/flink-examples-streaming-kafka-base/pom.xml
new file mode 100644
index 00000000000..3f389f843a1
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-base/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-examples</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.8-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-examples-streaming-kafka-base</artifactId>
+	<name>flink-examples-streaming-kafka-base</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java
new file mode 100644
index 00000000000..51de582dc0f
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/CustomWatermarkExtractor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes that the input stream
+ * records are strictly ascending.
+ *
+ * <p>Flink also ships some built-in convenience assigners, such as the
+ * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor}
+ */
+public class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
+
+	private static final long serialVersionUID = -742759155861320823L;
+
+	private long currentTimestamp = Long.MIN_VALUE;
+
+	@Override
+	public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
+		// the inputs are assumed to be of format (message,timestamp)
+		this.currentTimestamp = event.getTimestamp();
+		return event.getTimestamp();
+	}
+
+	@Nullable
+	@Override
+	public Watermark getCurrentWatermark() {
+		return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
similarity index 97%
rename from flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
rename to flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
index a144fc38257..7a8f84f8ca8 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
+++ b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.examples.kafka;
+package org.apache.flink.streaming.examples.kafka.base;
 
 /**
  * The event type used in the {@link Kafka010Example}.
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
similarity index 97%
rename from flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
rename to flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
index 5b8e17dadca..ea9c12b6056 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
+++ b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaEventSchema.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.examples.kafka;
+package org.apache.flink.streaming.examples.kafka.base;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java
new file mode 100644
index 00000000000..447dec24648
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/KafkaExampleUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The util class for kafka example.
+ */
+public class KafkaExampleUtil {
+
+	public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool parameterTool)
+		throws Exception {
+
+		if (parameterTool.getNumberOfParameters() < 5) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
+				"--bootstrap.servers <kafka brokers> " +
+				"--zookeeper.connect <zk quorum> --group.id <some id>");
+			throw new Exception("Missing parameters!\n" +
+				"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
+				"--bootstrap.servers <kafka brokers> " +
+				"--zookeeper.connect <zk quorum> --group.id <some id>");
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+		env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
+		env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		return env;
+	}
+
+}
diff --git a/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java
new file mode 100644
index 00000000000..e71f86cefba
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka-base/src/main/java/org/apache/flink/streaming/examples/kafka/base/RollingAdditionMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka.base;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key.
+ * The current total count is keyed state managed by Flink.
+ */
+public class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
+
+	private static final long serialVersionUID = 1180234853172462378L;
+
+	private transient ValueState<Integer> currentTotalCount;
+
+	@Override
+	public KafkaEvent map(KafkaEvent event) throws Exception {
+		Integer totalCount = currentTotalCount.value();
+
+		if (totalCount == null) {
+			totalCount = 0;
+		}
+		totalCount += event.getFrequency();
+
+		currentTotalCount.update(totalCount);
+
+		return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
+	}
+}
diff --git a/flink-examples/flink-examples-streaming-kafka/pom.xml b/flink-examples/flink-examples-streaming-kafka/pom.xml
new file mode 100644
index 00000000000..0434460e981
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-examples</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.8-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-examples-streaming-kafka</artifactId>
+	<name>flink-examples-streaming-kafka</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming-kafka-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Use the shade plugin to build a fat jar for the kafka connector test -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>fat-jar-kafka-example</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.examples.kafka.KafkaExample</mainClass>
+								</transformer>
+							</transformers>
+							<finalName>KafkaExample</finalName>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java b/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
new file mode 100644
index 00000000000..27e73d18c08
--- /dev/null
+++ b/flink-examples/flink-examples-streaming-kafka/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.examples.kafka.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEvent;
+import org.apache.flink.streaming.examples.kafka.base.KafkaEventSchema;
+import org.apache.flink.streaming.examples.kafka.base.KafkaExampleUtil;
+import org.apache.flink.streaming.examples.kafka.base.RollingAdditionMapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+/**
+ * A simple example that shows how to read from and write to modern Kafka. This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
+ * perform a rolling addition on each key for which the results are written back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ * 	--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092
+ * 	--zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KafkaExample extends KafkaExampleUtil {
+
+	public static void main(String[] args) throws Exception {
+		// parse input arguments
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+		StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+		DataStream<KafkaEvent> input = env
+			.addSource(
+				new FlinkKafkaConsumer<>(
+					parameterTool.getRequired("input-topic"),
+					new KafkaEventSchema(),
+					parameterTool.getProperties())
+					.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
+			.keyBy("word")
+			.map(new RollingAdditionMapper());
+
+		input.addSink(
+			new FlinkKafkaProducer<>(
+				parameterTool.getRequired("output-topic"),
+				new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
+				parameterTool.getProperties(),
+				FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
+
+		env.execute("Modern Kafka Example");
+	}
+
+}
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index 180a6cbe9d3..30347abfcb7 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -531,43 +531,6 @@ under the License.
 						</configuration>
 					</execution>
 
-					<execution>
-						<id>fat-jar-kafka-010-example</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<shadeTestJar>false</shadeTestJar>
-							<shadedArtifactAttached>false</shadedArtifactAttached>
-							<createDependencyReducedPom>false</createDependencyReducedPom>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass>
-								</transformer>
-							</transformers>
-							<finalName>Kafka010Example</finalName>
-							<!-- <outputFile>Kafka.jar</outputFile> -->
-							<filters>
-								<filter>
-									<artifact>*</artifact>
-									<includes>
-										<include>org/apache/flink/streaming/examples/kafka/**</include>
-										<include>org/apache/flink/streaming/**</include>
-										<include>org/apache/kafka/**</include>
-										<include>org/apache/curator/**</include>
-										<include>org/apache/zookeeper/**</include>
-										<include>org/apache/jute/**</include>
-										<include>org/I0Itec/**</include>
-										<include>jline/**</include>
-										<include>com/yammer/**</include>
-										<include>kafka/**</include>
-									</includes>
-								</filter>
-							</filters>
-						</configuration>
-					</execution>
-
 					<execution>
 						<id>fat-jar-twitter-example</id>
 						<phase>package</phase>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
deleted file mode 100644
index 62bfd4fc6c5..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
-
-import javax.annotation.Nullable;
-
-/**
- * A simple example that shows how to read from and write to Kafka. This will read String messages
- * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
- * perform a rolling addition on each key for which the results are written back to another topic.
- *
- * <p>This example also demonstrates using a watermark assigner to generate per-partition
- * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
- * the String messages are of formatted as a (word,frequency,timestamp) tuple.
- *
- * <p>Example usage:
- * 	--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
- */
-public class Kafka010Example {
-
-	public static void main(String[] args) throws Exception {
-		// parse input arguments
-		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-		if (parameterTool.getNumberOfParameters() < 5) {
-			System.out.println("Missing parameters!\n" +
-					"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
-					"--bootstrap.servers <kafka brokers> " +
-					"--zookeeper.connect <zk quorum> --group.id <some id>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-		env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
-		env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-		DataStream<KafkaEvent> input = env
-				.addSource(
-					new FlinkKafkaConsumer010<>(
-						parameterTool.getRequired("input-topic"),
-						new KafkaEventSchema(),
-						parameterTool.getProperties())
-					.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
-				.keyBy("word")
-				.map(new RollingAdditionMapper());
-
-		input.addSink(
-				new FlinkKafkaProducer010<>(
-						parameterTool.getRequired("output-topic"),
-						new KafkaEventSchema(),
-						parameterTool.getProperties()));
-
-		env.execute("Kafka 0.10 Example");
-	}
-
-	/**
-	 * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key.
-	 * The current total count is keyed state managed by Flink.
-	 */
-	private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
-
-		private static final long serialVersionUID = 1180234853172462378L;
-
-		private transient ValueState<Integer> currentTotalCount;
-
-		@Override
-		public KafkaEvent map(KafkaEvent event) throws Exception {
-			Integer totalCount = currentTotalCount.value();
-
-			if (totalCount == null) {
-				totalCount = 0;
-			}
-			totalCount += event.getFrequency();
-
-			currentTotalCount.update(totalCount);
-
-			return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
-		}
-	}
-
-	/**
-	 * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes that the input stream
-	 * records are strictly ascending.
-	 *
-	 * <p>Flink also ships some built-in convenience assigners, such as the
-	 * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor}
-	 */
-	private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
-
-		private static final long serialVersionUID = -742759155861320823L;
-
-		private long currentTimestamp = Long.MIN_VALUE;
-
-		@Override
-		public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
-			// the inputs are assumed to be of format (message,timestamp)
-			this.currentTimestamp = event.getTimestamp();
-			return event.getTimestamp();
-		}
-
-		@Nullable
-		@Override
-		public Watermark getCurrentWatermark() {
-			return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
-		}
-	}
-}
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index 165130d3bdf..0905d01ed3b 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -35,6 +35,9 @@ under the License.
 	<modules>
 		<module>flink-examples-batch</module>
 		<module>flink-examples-streaming</module>
+		<module>flink-examples-streaming-kafka-base</module>
+		<module>flink-examples-streaming-kafka</module>
+		<module>flink-examples-streaming-kafka-0.10</module>
 		<module>flink-examples-table</module>
 	</modules>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services