You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/03/26 06:25:43 UTC

[2/3] flink git commit: [FLINK-8975] [test] Add resume from savepoint end-to-end test

[FLINK-8975] [test] Add resume from savepoint end-to-end test

This closes #5733.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0c17d94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0c17d94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0c17d94

Branch: refs/heads/master
Commit: a0c17d94072ec6e127ea3cb3c507a595283b9b87
Parents: 448b9ed
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 21 16:32:51 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Mar 26 14:13:51 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-pre-commit-tests.sh  |   8 +
 flink-end-to-end-tests/test-scripts/common.sh   |  22 +++
 .../test-scripts/test_resume_savepoint.sh       | 155 +++++++++++++++++++
 .../statemachine/StateMachineExample.java       |   2 +-
 4 files changed, 186 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0c17d94/flink-end-to-end-tests/run-pre-commit-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 2c1810b..4fd580c 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -55,6 +55,14 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
     printf "\n==============================================================================\n"
+    printf "Running Resuming Savepoint end-to-end test\n"
+    printf "==============================================================================\n"
+    $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh
+    EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+    printf "\n==============================================================================\n"
     printf "Running class loading end-to-end test\n"
     printf "==============================================================================\n"
     $END_TO_END_DIR/test-scripts/test_streaming_classloader.sh

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c17d94/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index ef4856f..d4b9126 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -111,6 +111,28 @@ function stop_cluster {
   rm $FLINK_DIR/log/*
 }
 
+function wait_job_running {
+  for i in {1..10}; do
+    JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list | grep "$1")
+
+    if [[ "$JOB_LIST_RESULT" == "" ]]; then
+      echo "Job ($1) is not yet running."
+    else
+      echo "Job ($1) is running."
+      break
+    fi
+    sleep 1
+  done
+}
+
+function take_savepoint {
+  "$FLINK_DIR"/bin/flink savepoint $1 $2
+}
+
+function cancel_job {
+  "$FLINK_DIR"/bin/flink cancel $1
+}
+
 function check_result_hash {
   local name=$1
   local outfile_prefix=$2

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c17d94/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
new file mode 100755
index 0000000..7108d90
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
+
+# modify configuration to have 2 slots
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e 's/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 2/' $FLINK_DIR/conf/flink-conf.yaml
+
+# modify configuration to use SLF4J reporter; we will be using this to monitor the state machine progress
+cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
+echo "metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter" >> $FLINK_DIR/conf/flink-conf.yaml
+echo "metrics.reporter.slf4j.interval: 1 SECONDS" >> $FLINK_DIR/conf/flink-conf.yaml
+
+start_cluster
+
+# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
+function test_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # revert our modifications to the Flink distribution
+  rm $FLINK_DIR/conf/flink-conf.yaml
+  mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+  rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 15 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+function get_metric_state_machine_processed_records {
+  grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
+}
+
+function get_num_metric_samples {
+  grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
+}
+
+# monitor the numRecordsIn metric of the state machine operator;
+# only proceed to savepoint when the operator has processed 200 records
+while : ; do
+  NUM_RECORDS=$(get_metric_state_machine_processed_records)
+
+  if [ -z $NUM_RECORDS ]; then
+    NUM_RECORDS=0
+  fi
+
+  if (( $NUM_RECORDS < 200 )); then
+    echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+    sleep 1
+  else
+    break
+  fi
+done
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# Since it is not possible to differentiate reporter output between the first and second execution,
+# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+OLD_NUM_METRICS=$(get_num_metric_samples)
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# monitor the numRecordsIn metric of the state machine operator in the second execution
+# we let the test finish once the second restore execution has processed 200 records
+while : ; do
+  NUM_METRICS=$(get_num_metric_samples)
+  NUM_RECORDS=$(get_metric_state_machine_processed_records)
+
+  # only account for metrics that appeared in the second execution
+  if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
+    NUM_RECORDS=0
+  fi
+
+  if (( $NUM_RECORDS < 200 )); then
+    echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+    sleep 1
+  else
+    break
+  fi
+done
+
+# if state is errorneous and the state machine job produces alerting state transitions,
+# output would be non-empty and the test will not pass

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c17d94/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 052e954..14757fb 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -108,7 +108,7 @@ public class StateMachineExample {
 		alerts.print();
 
 		// trigger program execution
-		env.execute();
+		env.execute("State machine job");
 	}
 
 	// ------------------------------------------------------------------------