You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/18 12:11:09 UTC
[2/2] flink git commit: [FLINK-8990] [test] Test partition discovery
in Kafka end-to-end test
[FLINK-8990] [test] Test partition discovery in Kafka end-to-end test
This closes #5779.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d5325e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d5325e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d5325e2
Branch: refs/heads/release-1.5
Commit: 2d5325e2d4f4de26e58fc28507a673e223cd5e6d
Parents: aa54b87
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 28 16:36:18 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Apr 18 20:11:00 2018 +0800
----------------------------------------------------------------------
.../test-scripts/kafka-common.sh | 27 ++++++++++-
.../test-scripts/test_streaming_kafka010.sh | 47 ++++++++++++++++++--
2 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d5325e2/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 7f05357..49ff4fe 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -70,5 +70,30 @@ function send_messages_to_kafka {
}
function read_messages_from_kafka {
- $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning --max-messages $1 2> /dev/null
+ $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
+ --max-messages $1 \
+ --topic $2 \
+ --consumer-property group.id=$3 2> /dev/null
+}
+
+function modify_num_partitions {
+ $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 --zookeeper localhost:2181
+}
+
+function get_num_partitions {
+ $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --zookeeper localhost:2181 | grep -Eo "PartitionCount:[0-9]+" | cut -d ":" -f 2
+}
+
+function get_partition_end_offset {
+ local topic=$1
+ local partition=$2
+
+ # first, use the console consumer to produce a dummy consumer group
+ read_messages_from_kafka 0 $topic dummy-consumer
+
+ # then use the consumer offset utility to get the LOG_END_OFFSET value for the specified partition
+ $KAFKA_DIR/bin/kafka-consumer-groups.sh --describe --group dummy-consumer --bootstrap-server localhost:9092 2> /dev/null \
+ | grep "$topic \+$partition" \
+ | tr -s " " \
+ | cut -d " " -f 4
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d5325e2/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index 4d7525f..abc6186 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -54,7 +54,8 @@ create_kafka_topic 1 1 test-output
$FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
--input-topic test-input --output-topic test-output \
--prefix=PREFIX \
- --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
+ --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
+ --flink.partition-discovery.interval-millis 1000
function verify_output {
local expected=$(printf $1)
@@ -68,13 +69,51 @@ function verify_output {
fi
}
+echo "Sending messages to Kafka topic [test-input] ..."
# send some data to Kafka
send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
-KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
# check all keys; make sure we have actual newlines in the string, not "\n"
verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
+
+# now, we add a new partition to the topic
+echo "Repartitioning Kafka topic [test-input] ..."
+modify_num_partitions test-input 2
+
+if (( $(get_num_partitions test-input) != 2 )); then
+ echo "Failed adding a partition to test-input topic."
+ PASS=""
+ exit 1
+fi
+
+# send some more messages to Kafka
+echo "Sending more messages to Kafka topic [test-input] ..."
+send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
+
+# verify that our assumption that the new partition actually has written messages is correct
+if (( $(get_partition_end_offset test-input 1) == 0 )); then
+ echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
+ PASS=""
+ exit 1
+fi
+
+# all new messages should have been consumed, and has produced correct output
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
+KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
+
+verify_output "elephant,27,64213" "$KEY_1_MSGS"
+verify_output "squirrel,52,66413" "$KEY_2_MSGS"
+verify_output "bee,18,65647" "$KEY_3_MSGS"
+verify_output "giraffe,9,65555" "$KEY_4_MSGS"