You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/03 07:38:45 UTC
[2/2] flink git commit: [FLINK-8979] [test] Include shuffles in Kafka
end-to-end tests
[FLINK-8979] [test] Include shuffles in Kafka end-to-end tests
This closes #5778.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0d366e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0d366e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0d366e
Branch: refs/heads/master
Commit: 6d0d366ebc166a69534629af29ae9455427c5912
Parents: 0821f49
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 27 21:14:35 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Apr 3 15:37:27 2018 +0800
----------------------------------------------------------------------
.../test-scripts/test_streaming_kafka010.sh | 40 ++++++++---
.../examples/kafka/Kafka010Example.java | 65 ++++++++++-------
.../streaming/examples/kafka/KafkaEvent.java | 74 ++++++++++++++++++++
.../examples/kafka/KafkaEventSchema.java | 53 ++++++++++++++
4 files changed, 194 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index e09be35..0c318b6 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -23,11 +23,19 @@ source "$(dirname "$0")"/kafka-common.sh
setup_kafka_dist
start_kafka_cluster
+# modify configuration to have enough slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
+
start_cluster
function test_cleanup {
stop_kafka_cluster
+ # revert our modifications to the Flink distribution
+ rm $FLINK_DIR/conf/flink-conf.yaml
+ mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
# make sure to run regular cleanup as well
cleanup
}
@@ -44,15 +52,25 @@ $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
--prefix=PREFIX \
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
+function verify_output {
+ local expected=$(printf $1)
+
+ if [[ "$2" != "$expected" ]]; then
+ echo "Output from Flink program does not match expected output."
+ echo -e "EXPECTED FOR KEY: --$expected--"
+ echo -e "ACTUAL: --$2--"
+ PASS=""
+ exit 1
+ fi
+}
+
# send some data to Kafka
-send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
-DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
-
-# make sure we have actual newlines in the string, not "\n"
-EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")
-if [[ "$DATA_FROM_KAFKA" != "$EXPECTED" ]]; then
- echo "Output from Flink program does not match expected output."
- echo -e "EXPECTED: --$EXPECTED--"
- echo -e "ACTUAL: --$DATA_FROM_KAFKA--"
- PASS=""
-fi
+send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+# check all keys; make sure we have actual newlines in the string, not "\n"
+verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
+verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
+verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index 881aa67..62bfd4f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -17,10 +17,12 @@
package org.apache.flink.streaming.examples.kafka;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,12 +36,13 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import javax.annotation.Nullable;
/**
- * An example that shows how to read from and write to Kafka. This will read String messages
- * from the input topic, prefix them by a configured prefix and output to the output topic.
+ * A simple example that shows how to read from and write to Kafka. This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
+ * perform a rolling addition on each key for which the results are written back to another topic.
*
* <p>This example also demonstrates using a watermark assigner to generate per-partition
* watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
- * the String messages are of formatted as a (message,timestamp) tuple.
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
*
* <p>Example usage:
* --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
@@ -54,54 +57,63 @@ public class Kafka010Example {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
- "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]");
+ "--zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
- String prefix = parameterTool.get("prefix", "PREFIX:");
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
-
- // make parameters available in the web interface
- env.getConfig().setGlobalJobParameters(parameterTool);
-
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- DataStream<String> input = env
+ DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
- new SimpleStringSchema(),
+ new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
- .map(new PrefixingMapper(prefix));
+ .keyBy("word")
+ .map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
- new SimpleStringSchema(),
+ new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
}
- private static class PrefixingMapper implements MapFunction<String, String> {
+ /**
+ * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key.
+ * The current total count is keyed state managed by Flink.
+ */
+ private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
private static final long serialVersionUID = 1180234853172462378L;
- private final String prefix;
+ private transient ValueState<Integer> currentTotalCount;
+
+ @Override
+ public KafkaEvent map(KafkaEvent event) throws Exception {
+ Integer totalCount = currentTotalCount.value();
+
+ if (totalCount == null) {
+ totalCount = 0;
+ }
+ totalCount += event.getFrequency();
+
+ currentTotalCount.update(totalCount);
- public PrefixingMapper(String prefix) {
- this.prefix = prefix;
+ return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
}
@Override
- public String map(String value) throws Exception {
- return prefix + value;
+ public void open(Configuration parameters) throws Exception {
+ currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
}
}
@@ -112,18 +124,17 @@ public class Kafka010Example {
* <p>Flink also ships some built-in convenience assigners, such as the
* {@link BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor}
*/
- private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<String> {
+ private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
private static final long serialVersionUID = -742759155861320823L;
private long currentTimestamp = Long.MIN_VALUE;
@Override
- public long extractTimestamp(String element, long previousElementTimestamp) {
+ public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
// the inputs are assumed to be of format (message,timestamp)
- long timestamp = Long.valueOf(element.substring(element.indexOf(",") + 1));
- this.currentTimestamp = timestamp;
- return timestamp;
+ this.currentTimestamp = event.getTimestamp();
+ return event.getTimestamp();
}
@Nullable
http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
new file mode 100644
index 0000000..a144fc3
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+/**
+ * The event type used in the {@link Kafka010Example}.
+ *
+ * <p>This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing
+ * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type.
+ * For a demonstration of this, see the code in {@link Kafka010Example}.
+ */
+public class KafkaEvent {
+
+ private String word;
+ private int frequency;
+ private long timestamp;
+
+ public KafkaEvent() {}
+
+ public KafkaEvent(String word, int frequency, long timestamp) {
+ this.word = word;
+ this.frequency = frequency;
+ this.timestamp = timestamp;
+ }
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+
+ public int getFrequency() {
+ return frequency;
+ }
+
+ public void setFrequency(int frequency) {
+ this.frequency = frequency;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public static KafkaEvent fromString(String eventStr) {
+ String[] split = eventStr.split(",");
+ return new KafkaEvent(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2]));
+ }
+
+ @Override
+ public String toString() {
+ return word + "," + frequency + "," + timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d0d366e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
new file mode 100644
index 0000000..5b8e17d
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaEventSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
+ * Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
+ */
+public class KafkaEventSchema implements DeserializationSchema<KafkaEvent>, SerializationSchema<KafkaEvent> {
+
+ private static final long serialVersionUID = 6154188370181669758L;
+
+ @Override
+ public byte[] serialize(KafkaEvent event) {
+ return event.toString().getBytes();
+ }
+
+ @Override
+ public KafkaEvent deserialize(byte[] message) throws IOException {
+ return KafkaEvent.fromString(new String(message));
+ }
+
+ @Override
+ public boolean isEndOfStream(KafkaEvent nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<KafkaEvent> getProducedType() {
+ return TypeInformation.of(KafkaEvent.class);
+ }
+}