You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/03 07:38:44 UTC

[1/2] flink git commit: [FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts

Repository: flink
Updated Branches:
  refs/heads/master aa88a425b -> 6d0d366eb


[FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0821f491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0821f491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0821f491

Branch: refs/heads/master
Commit: 0821f4918fc06249d2dea1fa4ef9b2a034b5a83b
Parents: aa88a42
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 27 18:06:19 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Apr 3 15:37:04 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/kafka-common.sh                | 74 ++++++++++++++++++++
 .../test-scripts/test_resume_savepoint.sh       | 34 ++-------
 .../test-scripts/test_streaming_kafka010.sh     | 51 ++++----------
 3 files changed, 91 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
new file mode 100644
index 0000000..7f05357
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+    echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+    exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+    echo "Waiting for broker..."
+    sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning --max-messages $1 2> /dev/null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 83e0e5a..6642ad5 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -23,27 +23,10 @@ if [ -z $1 ] || [ -z $2 ]; then
 fi
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
-
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
+setup_kafka_dist
+start_kafka_cluster
 
 ORIGINAL_DOP=$1
 NEW_DOP=$2
@@ -67,8 +50,7 @@ start_cluster
 
 # make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
 function test_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+  stop_kafka_cluster
 
   # revert our modifications to the Flink distribution
   rm $FLINK_DIR/conf/flink-conf.yaml
@@ -81,14 +63,8 @@ function test_cleanup {
 trap test_cleanup INT
 trap test_cleanup EXIT
 
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
-
 # create the required topic
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
+create_kafka_topic 1 1 test-input
 
 # run the state machine example job
 STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar \

http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index a6a9a8e..e09be35 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -18,51 +18,25 @@
 ################################################################################
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-start_cluster
-
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+setup_kafka_dist
+start_kafka_cluster
 
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
-
-# make sure to stop Kafka and ZooKeeper at the end
+start_cluster
 
-function kafka_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+function test_cleanup {
+  stop_kafka_cluster
 
   # make sure to run regular cleanup as well
   cleanup
 }
-trap kafka_cleanup INT
-trap kafka_cleanup EXIT
-
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
+trap test_cleanup INT
+trap test_cleanup EXIT
 
 # create the required topics
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-output
+create_kafka_topic 1 1 test-input
+create_kafka_topic 1 1 test-output
 
 # run the Flink job (detached mode)
 $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
@@ -71,9 +45,8 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
 
 # send some data to Kafka
-echo -e "hello,45218\nwhats,46213\nup,51348" | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-input
-
-DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-output --from-beginning --max-messages 3 2> /dev/null)
+send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
+DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
 
 # make sure we have actual newlines in the string, not "\n"
 EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")


[2/2] flink git commit: [FLINK-8979] [test] Include shuffles in Kafka end-to-end tests

Posted by tz...@apache.org.
[FLINK-8979] [test] Include shuffles in Kafka end-to-end tests

This closes #5778.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0d366e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0d366e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0d366e

Branch: refs/heads/master
Commit: 6d0d366ebc166a69534629af29ae9455427c5912
Parents: 0821f49
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 27 21:14:35 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Apr 3 15:37:27 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/test_streaming_kafka010.sh     | 40 ++++++++---
 .../examples/kafka/Kafka010Example.java         | 65 ++++++++++-------
 .../streaming/examples/kafka/KafkaEvent.java    | 74 ++++++++++++++++++++
 .../examples/kafka/KafkaEventSchema.java        | 53 ++++++++++++++
 4 files changed, 194 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index e09be35..0c318b6 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -23,11 +23,19 @@ source "$(dirname "$0")"/kafka-common.sh
 setup_kafka_dist
 start_kafka_cluster
 
+# modify configuration to have enough slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
+
 start_cluster
 
 function test_cleanup {
   stop_kafka_cluster
 
+  # revert our modifications to the Flink distribution
+  rm $FLINK_DIR/conf/flink-conf.yaml
+  mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
   # make sure to run regular cleanup as well
   cleanup
 }
@@ -44,15 +52,25 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
   --prefix=PREFIX \
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
 
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED FOR KEY: --$expected--"
+    echo -e "ACTUAL: --$2--"
+    PASS=""
+    exit 1
+  fi
+}
+
 # send some data to Kafka
-send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
-DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
-
-# make sure we have actual newlines in the string, not "\n"
-EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")
-if [[ "$DATA_FROM_KAFKA" != "$EXPECTED" ]]; then
-  echo "Output from Flink program does not match expected output."
-  echo -e "EXPECTED: --$EXPECTED--"
-  echo -e "ACTUAL: --$DATA_FROM_KAFKA--"
-  PASS=""
-fi
+send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+# check all keys; make sure we have actual newlines in the string, not "\n"
+verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
+verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
+verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index 881aa67..62bfd4f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -17,10 +17,12 @@
 
 package org.apache.flink.streaming.examples.kafka;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,12 +36,13 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 import javax.annotation.Nullable;
 
 /**
- * An example that shows how to read from and write to Kafka. This will read String messages
- * from the input topic, prefix them by a configured prefix and output to the output topic.
+ * A simple example that shows how to read from and write to Kafka. This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
+ * perform a rolling addition on each key for which the results are written back to another topic.
  *
  * <p>This example also demonstrates using a watermark assigner to generate per-partition
  * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
- * the String messages are of formatted as a (message,timestamp) tuple.
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
  *
  * <p>Example usage:
  * 	--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
@@ -54,54 +57,63 @@ public class Kafka010Example {
 			System.out.println("Missing parameters!\n" +
 					"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
 					"--bootstrap.servers <kafka brokers> " +
-					"--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]");
+					"--zookeeper.connect <zk quorum> --group.id <some id>");
 			return;
 		}
 
-		String prefix = parameterTool.get("prefix", "PREFIX:");
-
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
 		env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
 		env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(parameterTool);
-
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-		DataStream<String> input = env
+		DataStream<KafkaEvent> input = env
 				.addSource(
 					new FlinkKafkaConsumer010<>(
 						parameterTool.getRequired("input-topic"),
-						new SimpleStringSchema(),
+						new KafkaEventSchema(),
 						parameterTool.getProperties())
 					.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
-				.map(new PrefixingMapper(prefix));
+				.keyBy("word")
+				.map(new RollingAdditionMapper());
 
 		input.addSink(
 				new FlinkKafkaProducer010<>(
 						parameterTool.getRequired("output-topic"),
-						new SimpleStringSchema(),
+						new KafkaEventSchema(),
 						parameterTool.getProperties()));
 
 		env.execute("Kafka 0.10 Example");
 	}
 
-	private static class PrefixingMapper implements MapFunction<String, String> {
+	/**
+	 * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key.
+	 * The current total count is keyed state managed by Flink.
+	 */
+	private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
 
 		private static final long serialVersionUID = 1180234853172462378L;
 
-		private final String prefix;
+		private transient ValueState<Integer> currentTotalCount;
+
+		@Override
+		public KafkaEvent map(KafkaEvent event) throws Exception {
+			Integer totalCount = currentTotalCount.value();
+
+			if (totalCount == null) {
+				totalCount = 0;
+			}
+			totalCount += event.getFrequency();
+
+			currentTotalCount.update(totalCount);
 
-		public PrefixingMapper(String prefix) {
-			this.prefix = prefix;
+			return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
 		}
 
 		@Override
-		public String map(String value) throws Exception {
-			return prefix + value;
+		public void open(Configuration parameters) throws Exception {
+			currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
 		}
 	}
 
@@ -112,18 +124,17 @@ public class Kafka010Example {
 	 * <p>Flink also ships some built-in convenience assigners, such as the
 	 * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor}
 	 */
-	private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<String> {
+	private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
 
 		private static final long serialVersionUID = -742759155861320823L;
 
 		private long currentTimestamp = Long.MIN_VALUE;
 
 		@Override
-		public long extractTimestamp(String element, long previousElementTimestamp) {
+		public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
 			// the inputs are assumed to be of format (message,timestamp)
-			long timestamp = Long.valueOf(element.substring(element.indexOf(",") + 1));
-			this.currentTimestamp = timestamp;
-			return timestamp;
+			this.currentTimestamp = event.getTimestamp();
+			return event.getTimestamp();
 		}
 
 		@Nullable

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
new file mode 100644
index 0000000..a144fc3
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * <p>This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
+
+	private String word;
+	private int frequency;
+	private long timestamp;
+
+	public KafkaEvent() {}
+
+	public KafkaEvent(String word, int frequency, long timestamp) {
+		this.word = word;
+		this.frequency = frequency;
+		this.timestamp = timestamp;
+	}
+
+	public String getWord() {
+		return word;
+	}
+
+	public void setWord(String word) {
+		this.word = word;
+	}
+
+	public int getFrequency() {
+		return frequency;
+	}
+
+	public void setFrequency(int frequency) {
+		this.frequency = frequency;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public static KafkaEvent fromString(String eventStr) {
+		String[] split = eventStr.split(",");
+		return new KafkaEvent(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2]));
+	}
+
+	@Override
+	public String toString() {
+		return word + "," + frequency + "," + timestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
new file mode 100644
index 0000000..5b8e17d
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
+ * Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
+ */
+public class KafkaEventSchema implements DeserializationSchema<KafkaEvent>, SerializationSchema<KafkaEvent> {
+
+	private static final long serialVersionUID = 6154188370181669758L;
+
+	@Override
+	public byte[] serialize(KafkaEvent event) {
+		return event.toString().getBytes();
+	}
+
+	@Override
+	public KafkaEvent deserialize(byte[] message) throws IOException {
+		return KafkaEvent.fromString(new String(message));
+	}
+
+	@Override
+	public boolean isEndOfStream(KafkaEvent nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<KafkaEvent> getProducedType() {
+		return TypeInformation.of(KafkaEvent.class);
+	}
+}