You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/03 07:38:45 UTC

[2/2] flink git commit: [FLINK-8979] [test] Include shuffles in Kafka end-to-end tests

[FLINK-8979] [test] Include shuffles in Kafka end-to-end tests

This closes #5778.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0d366e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0d366e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0d366e

Branch: refs/heads/master
Commit: 6d0d366ebc166a69534629af29ae9455427c5912
Parents: 0821f49
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 27 21:14:35 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Apr 3 15:37:27 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/test_streaming_kafka010.sh     | 40 ++++++++---
 .../examples/kafka/Kafka010Example.java         | 65 ++++++++++-------
 .../streaming/examples/kafka/KafkaEvent.java    | 74 ++++++++++++++++++++
 .../examples/kafka/KafkaEventSchema.java        | 53 ++++++++++++++
 4 files changed, 194 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index e09be35..0c318b6 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -23,11 +23,19 @@ source "$(dirname "$0")"/kafka-common.sh
 setup_kafka_dist
 start_kafka_cluster
 
+# modify configuration to have enough slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
+
 start_cluster
 
 function test_cleanup {
   stop_kafka_cluster
 
+  # revert our modifications to the Flink distribution
+  rm $FLINK_DIR/conf/flink-conf.yaml
+  mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
   # make sure to run regular cleanup as well
   cleanup
 }
@@ -44,15 +52,25 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
   --prefix=PREFIX \
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
 
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED FOR KEY: --$expected--"
+    echo -e "ACTUAL: --$2--"
+    PASS=""
+    exit 1
+  fi
+}
+
 # send some data to Kafka
-send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
-DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
-
-# make sure we have actual newlines in the string, not "\n"
-EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")
-if [[ "$DATA_FROM_KAFKA" != "$EXPECTED" ]]; then
-  echo "Output from Flink program does not match expected output."
-  echo -e "EXPECTED: --$EXPECTED--"
-  echo -e "ACTUAL: --$DATA_FROM_KAFKA--"
-  PASS=""
-fi
+send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+# check all keys; make sure we have actual newlines in the string, not "\n"
+verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
+verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
+verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index 881aa67..62bfd4f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -17,10 +17,12 @@
 
 package org.apache.flink.streaming.examples.kafka;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,12 +36,13 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 import javax.annotation.Nullable;
 
 /**
- * An example that shows how to read from and write to Kafka. This will read String messages
- * from the input topic, prefix them by a configured prefix and output to the output topic.
+ * A simple example that shows how to read from and write to Kafka. This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
+ * perform a rolling addition on each key for which the results are written back to another topic.
  *
  * <p>This example also demonstrates using a watermark assigner to generate per-partition
  * watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
- * the String messages are of formatted as a (message,timestamp) tuple.
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
  *
  * <p>Example usage:
  * 	--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
@@ -54,54 +57,63 @@ public class Kafka010Example {
 			System.out.println("Missing parameters!\n" +
 					"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
 					"--bootstrap.servers <kafka brokers> " +
-					"--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]");
+					"--zookeeper.connect <zk quorum> --group.id <some id>");
 			return;
 		}
 
-		String prefix = parameterTool.get("prefix", "PREFIX:");
-
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
 		env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
 		env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(parameterTool);
-
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-		DataStream<String> input = env
+		DataStream<KafkaEvent> input = env
 				.addSource(
 					new FlinkKafkaConsumer010<>(
 						parameterTool.getRequired("input-topic"),
-						new SimpleStringSchema(),
+						new KafkaEventSchema(),
 						parameterTool.getProperties())
 					.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
-				.map(new PrefixingMapper(prefix));
+				.keyBy("word")
+				.map(new RollingAdditionMapper());
 
 		input.addSink(
 				new FlinkKafkaProducer010<>(
 						parameterTool.getRequired("output-topic"),
-						new SimpleStringSchema(),
+						new KafkaEventSchema(),
 						parameterTool.getProperties()));
 
 		env.execute("Kafka 0.10 Example");
 	}
 
-	private static class PrefixingMapper implements MapFunction<String, String> {
+	/**
+	 * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key.
+	 * The current total count is keyed state managed by Flink.
+	 */
+	private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
 
 		private static final long serialVersionUID = 1180234853172462378L;
 
-		private final String prefix;
+		private transient ValueState<Integer> currentTotalCount;
+
+		@Override
+		public KafkaEvent map(KafkaEvent event) throws Exception {
+			Integer totalCount = currentTotalCount.value();
+
+			if (totalCount == null) {
+				totalCount = 0;
+			}
+			totalCount += event.getFrequency();
+
+			currentTotalCount.update(totalCount);
 
-		public PrefixingMapper(String prefix) {
-			this.prefix = prefix;
+			return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
 		}
 
 		@Override
-		public String map(String value) throws Exception {
-			return prefix + value;
+		public void open(Configuration parameters) throws Exception {
+			currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
 		}
 	}
 
@@ -112,18 +124,17 @@ public class Kafka010Example {
 	 * <p>Flink also ships some built-in convenience assigners, such as the
 	 * {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor}
 	 */
-	private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<String> {
+	private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
 
 		private static final long serialVersionUID = -742759155861320823L;
 
 		private long currentTimestamp = Long.MIN_VALUE;
 
 		@Override
-		public long extractTimestamp(String element, long previousElementTimestamp) {
+		public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
 			// the inputs are assumed to be of format (message,timestamp)
-			long timestamp = Long.valueOf(element.substring(element.indexOf(",") + 1));
-			this.currentTimestamp = timestamp;
-			return timestamp;
+			this.currentTimestamp = event.getTimestamp();
+			return event.getTimestamp();
 		}
 
 		@Nullable

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
new file mode 100644
index 0000000..a144fc3
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * <p>This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
+
+	private String word;
+	private int frequency;
+	private long timestamp;
+
+	public KafkaEvent() {}
+
+	public KafkaEvent(String word, int frequency, long timestamp) {
+		this.word = word;
+		this.frequency = frequency;
+		this.timestamp = timestamp;
+	}
+
+	public String getWord() {
+		return word;
+	}
+
+	public void setWord(String word) {
+		this.word = word;
+	}
+
+	public int getFrequency() {
+		return frequency;
+	}
+
+	public void setFrequency(int frequency) {
+		this.frequency = frequency;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public static KafkaEvent fromString(String eventStr) {
+		String[] split = eventStr.split(",");
+		return new KafkaEvent(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2]));
+	}
+
+	@Override
+	public String toString() {
+		return word + "," + frequency + "," + timestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
new file mode 100644
index 0000000..5b8e17d
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
+ * Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
+ */
+public class KafkaEventSchema implements DeserializationSchema<KafkaEvent>, SerializationSchema<KafkaEvent> {
+
+	private static final long serialVersionUID = 6154188370181669758L;
+
+	@Override
+	public byte[] serialize(KafkaEvent event) {
+		return event.toString().getBytes();
+	}
+
+	@Override
+	public KafkaEvent deserialize(byte[] message) throws IOException {
+		return KafkaEvent.fromString(new String(message));
+	}
+
+	@Override
+	public boolean isEndOfStream(KafkaEvent nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<KafkaEvent> getProducedType() {
+		return TypeInformation.of(KafkaEvent.class);
+	}
+}