You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/18 12:11:09 UTC

[2/2] flink git commit: [FLINK-8990] [test] Test partition discovery in Kafka end-to-end test

[FLINK-8990] [test] Test partition discovery in Kafka end-to-end test

This closes #5779.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d5325e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d5325e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d5325e2

Branch: refs/heads/release-1.5
Commit: 2d5325e2d4f4de26e58fc28507a673e223cd5e6d
Parents: aa54b87
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 28 16:36:18 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Apr 18 20:11:00 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/kafka-common.sh                | 27 ++++++++++-
 .../test-scripts/test_streaming_kafka010.sh     | 47 ++++++++++++++++++--
 2 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d5325e2/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 7f05357..49ff4fe 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -70,5 +70,30 @@ function send_messages_to_kafka {
 }
 
 function read_messages_from_kafka {
-  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning --max-messages $1 2> /dev/null
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
+    --max-messages $1 \
+    --topic $2 \
+    --consumer-property group.id=$3 2> /dev/null
+}
+
+function modify_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 --zookeeper localhost:2181
+}
+
+function get_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --zookeeper localhost:2181 | grep -Eo "PartitionCount:[0-9]+" | cut -d ":" -f 2
+}
+
+function get_partition_end_offset {
+  local topic=$1
+  local partition=$2
+
+  # first, use the console consumer to produce a dummy consumer group
+  read_messages_from_kafka 0 $topic dummy-consumer
+
+  # then use the consumer offset utility to get the LOG_END_OFFSET value for the specified partition
+  $KAFKA_DIR/bin/kafka-consumer-groups.sh --describe --group dummy-consumer --bootstrap-server localhost:9092 2> /dev/null \
+    | grep "$topic \+$partition" \
+    | tr -s " " \
+    | cut -d " " -f 4
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d5325e2/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index 4d7525f..abc6186 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -54,7 +54,8 @@ create_kafka_topic 1 1 test-output
 $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
   --input-topic test-input --output-topic test-output \
   --prefix=PREFIX \
-  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
+  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
+  --flink.partition-discovery.interval-millis 1000
 
 function verify_output {
   local expected=$(printf $1)
@@ -68,13 +69,51 @@ function verify_output {
   fi
 }
 
+echo "Sending messages to Kafka topic [test-input] ..."
 # send some data to Kafka
 send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
-KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
 
 # check all keys; make sure we have actual newlines in the string, not "\n"
 verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
 verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
 verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
+
+# now, we add a new partition to the topic
+echo "Repartitioning Kafka topic [test-input] ..."
+modify_num_partitions test-input 2
+
+if (( $(get_num_partitions test-input) != 2 )); then
+  echo "Failed adding a partition to test-input topic."
+  PASS=""
+  exit 1
+fi
+
+# send some more messages to Kafka
+echo "Sending more messages to Kafka topic [test-input] ..."
+send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
+
+# verify that our assumption that the new partition actually has written messages is correct
+if (( $(get_partition_end_offset test-input 1) == 0 )); then
+  echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
+  PASS=""
+  exit 1
+fi
+
+# all new messages should have been consumed, and has produced correct output
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
+KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
+
+verify_output "elephant,27,64213" "$KEY_1_MSGS"
+verify_output "squirrel,52,66413" "$KEY_2_MSGS"
+verify_output "bee,18,65647" "$KEY_3_MSGS"
+verify_output "giraffe,9,65555" "$KEY_4_MSGS"