You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/03/22 19:37:47 UTC
[GitHub] flink pull request #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMa...
GitHub user kl0u opened a pull request:
https://github.com/apache/flink/pull/5750
[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.
Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed.
## Verifying this change
It is a script that you can run independently.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kl0u/flink ha-end-to-end-inv
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5750.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5750
----
commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91
Author: kkloudas <kk...@...>
Date: 2018-03-15T12:13:46Z
[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.
Adds an end-to-end test that runs the StateMachineExample on a local
cluster with HA enabled. There is a single JM which gets killed and
re-created and we check if the new JM picks up the job execution and
if at the end the StateMachine has no ALERTs printed.
----
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177356523
--- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
@@ -105,7 +131,7 @@ public static void main(String[] args) throws Exception {
.flatMap(new StateMachineMapper());
// output the alerts to std-out
- alerts.print();
+ alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE);
--- End diff --
Do we really need to change this? `test_resume_savepoint.sh` does not change the example.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177354347
--- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
@@ -92,7 +96,29 @@ public static void main(String[] args) throws Exception {
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
+
+ final String checkpointDir = params.getRequired("checkpointDir");
--- End diff --
In any case you should update the `usage string` above and end to end tests that depend on this test as well.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176725504
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
--- End diff --
function does not reset `./conf/masters` file
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177341094
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,9 +162,42 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
--- End diff --
Does it makes sense to start multiple job managers with the same `ipPort`?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177342612
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
--- End diff --
We should remove the `wait for completion` comment.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176727208
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
--- End diff --
`kill` and `echo` are not waiting, right?
---
[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5750
Thanks a lot @twalthr for the review!
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177338397
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,109 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+
+ # first revert the conf/masters file
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # and then the conf/flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_config() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # clean up the dir that will be used for zookeeper storage
+ # (see high-availability.zookeeper.storageDir below)
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ # then move on to create the flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ echo "Setting up HA Cluster..."
+ create_ha_config
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
+ # Parses the zoo.cfg and starts locally zk.
+
+ # This is almost the same code as the
+ # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
--- End diff --
Shouldn't the 'end-to-end tests' test 'end-to-end' also to test our scripts? This way we do not test what a user would use.
---
[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/5750
Based on the umbrella task link FLINK-8970, it seems like this e2e test should be attached to FLINK-8973 instead?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176734645
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failes jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+}
+
+run_ha_test() {
+ parallelism=$1
+ backend=$2
+ async=$3
+ incremental=$4
+ maxAttempts=$5
+ rstrtInterval=$6
+ output=$7
+
+ jmKillAndRetries=2
+ checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+ # start the cluster on HA mode and
+ # verify that all JMs are running
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
+
+ # submit a job in detached mode and let it run
+ $FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+ --stateBackend ${backend} \
+ --checkpointDir "file://${checkpointDir}" \
+ --asyncCheckpoints ${async} \
+ --incrementalCheckpoints ${incremental} \
+ --restartAttempts ${maxAttempts} \
+ --restartDelay ${rstrtInterval} \
+ --output ${output} > /dev/null
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ watchdogPid=$!
+
+ # let the job run for a while to take some checkpoints
+ sleep 50
+
+ for (( c=0; c<${jmKillAndRetries}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new JM which will take over
+ kill_jm 0
+ sleep 50
+ done
+
+ verify_logs ${jmKillAndRetries}
--- End diff --
Will the test fail if there are errors or will there be just error messages printed?
I don't see an `exit` or something similar
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176766535
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failes jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+}
+
+run_ha_test() {
+ parallelism=$1
+ backend=$2
+ async=$3
+ incremental=$4
+ maxAttempts=$5
+ rstrtInterval=$6
+ output=$7
+
+ jmKillAndRetries=2
+ checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+ # start the cluster on HA mode and
+ # verify that all JMs are running
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
+
+ # submit a job in detached mode and let it run
+ $FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+ --stateBackend ${backend} \
+ --checkpointDir "file://${checkpointDir}" \
+ --asyncCheckpoints ${async} \
+ --incrementalCheckpoints ${incremental} \
+ --restartAttempts ${maxAttempts} \
+ --restartDelay ${rstrtInterval} \
+ --output ${output} > /dev/null
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ watchdogPid=$!
+
+ # let the job run for a while to take some checkpoints
+ sleep 50
+
+ for (( c=0; c<${jmKillAndRetries}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new JM which will take over
+ kill_jm 0
--- End diff --
OK :+1:
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176727554
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
+ kill -9 ${pid}
+
+ echo "Killed JM @ ${pid}."
+}
+
+function stop_ha_cluster {
+ echo "Tearing down HA Cluster..."
+ stop_cluster
+ stop_local_zk
+ cleanup
+}
+
+function stop_local_zk {
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
--- End diff --
`id` doesn't seem to be used
`server` doesn't seem to be used (or is it overriding the outer `server` variable`?)
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176732031
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failes jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+}
+
+run_ha_test() {
+ parallelism=$1
+ backend=$2
+ async=$3
+ incremental=$4
+ maxAttempts=$5
+ rstrtInterval=$6
+ output=$7
+
+ jmKillAndRetries=2
+ checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+ # start the cluster on HA mode and
+ # verify that all JMs are running
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
+
+ # submit a job in detached mode and let it run
+ $FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+ --stateBackend ${backend} \
+ --checkpointDir "file://${checkpointDir}" \
+ --asyncCheckpoints ${async} \
+ --incrementalCheckpoints ${incremental} \
+ --restartAttempts ${maxAttempts} \
+ --restartDelay ${rstrtInterval} \
+ --output ${output} > /dev/null
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ watchdogPid=$!
+
+ # let the job run for a while to take some checkpoints
+ sleep 50
+
+ for (( c=0; c<${jmKillAndRetries}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new JM which will take over
+ kill_jm 0
+ sleep 50
+ done
+
+ verify_logs ${jmKillAndRetries}
+
+ # kill the cluster and zookeeper
+ stop_cluster_and_watchdog
+}
+
+run_ha_test 1 "file" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "rocks" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "file" "true" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "rocks" "false" "true" 3 100 "${TEST_DATA_DIR}/output.txt"
+trap stop_cluster_and_watchdog EXIT
--- End diff --
I don't have much experience with `trap` but the examples I've seen run it before the code that it should guard and not at the end of the script when there's nothing left to do.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176725740
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # then move on to create the flink-conf.yaml
+
+ if [ -e $TEST_DATA_DIR/recovery ]; then
--- End diff --
add a comment
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176732649
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
--- End diff --
What is `PASS` being used for?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176765903
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failes jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+}
+
+run_ha_test() {
+ parallelism=$1
+ backend=$2
+ async=$3
+ incremental=$4
+ maxAttempts=$5
+ rstrtInterval=$6
+ output=$7
+
+ jmKillAndRetries=2
+ checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+ # start the cluster on HA mode and
+ # verify that all JMs are running
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
+
+ # submit a job in detached mode and let it run
+ $FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+ --stateBackend ${backend} \
+ --checkpointDir "file://${checkpointDir}" \
+ --asyncCheckpoints ${async} \
+ --incrementalCheckpoints ${incremental} \
+ --restartAttempts ${maxAttempts} \
+ --restartDelay ${rstrtInterval} \
+ --output ${output} > /dev/null
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ watchdogPid=$!
+
+ # let the job run for a while to take some checkpoints
+ sleep 50
+
+ for (( c=0; c<${jmKillAndRetries}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new JM which will take over
+ kill_jm 0
--- End diff --
There is only one running at any point in time.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177343482
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,9 +162,42 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
+ kill -9 ${pid}
+
+ echo "Killed JM @ ${pid}."
+}
+
function stop_cluster {
"$FLINK_DIR"/bin/stop-cluster.sh
+ # stop zookeeper only if there are processes running
--- End diff --
I would move this to a `stop_ha_cluster` function. Btw we also don't call `stop-zookeeper-quorum.sh`.
---
[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/5750
Thank you @kl0u. I found a little bug that I will fix. Merging...
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176730412
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failes jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+}
+
+run_ha_test() {
+ parallelism=$1
+ backend=$2
+ async=$3
+ incremental=$4
+ maxAttempts=$5
+ rstrtInterval=$6
+ output=$7
+
+ jmKillAndRetries=2
+ checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+ # start the cluster on HA mode and
+ # verify that all JMs are running
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
+
+ # submit a job in detached mode and let it run
+ $FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+ --stateBackend ${backend} \
+ --checkpointDir "file://${checkpointDir}" \
+ --asyncCheckpoints ${async} \
+ --incrementalCheckpoints ${incremental} \
+ --restartAttempts ${maxAttempts} \
+ --restartDelay ${rstrtInterval} \
+ --output ${output} > /dev/null
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ watchdogPid=$!
+
+ # let the job run for a while to take some checkpoints
+ sleep 50
+
+ for (( c=0; c<${jmKillAndRetries}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new JM which will take over
+ kill_jm 0
--- End diff --
Are you sure that this will kill the JM that is in charge of running the job?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177344409
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+watch_dog_pid=0
+
+stop_cluster_and_watchdog() {
--- End diff --
We should use a consistent coding style. Add `function` keywords?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5750
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177436350
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,9 +162,42 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
--- End diff --
Actually we launch only one, and we kill it and then start a new one.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176725241
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
--- End diff --
`conf` or `config`? Keep in sync with `revert_default_config()`
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176761099
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
--- End diff --
yes, this is because we do not change it here. But I will also change it for future safety.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177344142
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
--- End diff --
Use `$FLINK_DIR/examples/streaming` instead?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177340291
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,109 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+
+ # first revert the conf/masters file
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # and then the conf/flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_config() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # clean up the dir that will be used for zookeeper storage
+ # (see high-availability.zookeeper.storageDir below)
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ # then move on to create the flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ echo "Setting up HA Cluster..."
+ create_ha_config
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
+ # Parses the zoo.cfg and starts locally zk.
+
+ # This is almost the same code as the
+ # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
+
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
+ address=${BASH_REMATCH[2]}
+
+ if [ "${address}" != "localhost" ]; then
+ echo "[WARN] Parse error. Only available for localhost."
--- End diff --
`[ERROR]` instead of `[WARN]`? Because we fail hard here right?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176770591
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+ kill ${watchdogPid} 2> /dev/null
+ wait ${watchdogPid} 2> /dev/null
+
+ stop_ha_cluster
+}
+
+verify_logs() {
+ expectedRetries=$1
+
+ # verify that we have no alerts
+ if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
--- End diff --
It is used in the `check_all_pass` in the `common.sh`. Kind of obscure, but this is the structure for now...
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177353937
--- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
@@ -92,7 +96,29 @@ public static void main(String[] args) throws Exception {
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
+
+ final String checkpointDir = params.getRequired("checkpointDir");
--- End diff --
We could do this also via the `flink-conf.yaml` and don't make the parameters of the example too complicated.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177335437
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -93,6 +93,13 @@ if [ $EXIT_CODE == 0 ]; then
EXIT_CODE=$?
fi
+#if [ $EXIT_CODE == 0 ]; then
--- End diff --
Remove this. This test is too complicated for pre-commit tests.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176725711
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # then move on to create the flink-conf.yaml
--- End diff --
move comment down to `sed 's/^ //g' ...`
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176762986
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
+ kill -9 ${pid}
+
+ echo "Killed JM @ ${pid}."
+}
+
+function stop_ha_cluster {
+ echo "Tearing down HA Cluster..."
+ stop_cluster
+ stop_local_zk
+ cleanup
+}
+
+function stop_local_zk {
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
--- End diff --
server is used in the warning message.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177364376
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,109 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+
+ # first revert the conf/masters file
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # and then the conf/flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_config() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # clean up the dir that will be used for zookeeper storage
+ # (see high-availability.zookeeper.storageDir below)
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ # then move on to create the flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ echo "Setting up HA Cluster..."
+ create_ha_config
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
+ # Parses the zoo.cfg and starts locally zk.
+
+ # This is almost the same code as the
+ # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
--- End diff --
The problem is that the `zookeeper` script does not take into account that the fact that we may want to launch `zookeeper` locally, so it always asks for `SSH` credentials.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176726476
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # then move on to create the flink-conf.yaml
+
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ echo "Setting up HA Cluster..."
+ create_ha_conf
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
+ address=${BASH_REMATCH[2]}
--- End diff --
`address` seems to be unused
---
[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5750
Hi @walterddr, I addressed most of your comments. Feel free to have another look.
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r177343812
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
--- End diff --
Remove this?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176725915
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 1
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # then move on to create the flink-conf.yaml
+
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ echo "Setting up HA Cluster..."
+ create_ha_conf
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
--- End diff --
add a few comments to this method?
---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5750#discussion_r176767031
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
done
}
+function jm_watchdog() {
+ expectedJms=$1
+ ipPort=$2
+
+ while true; do
+ runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ missingJms=$((expectedJms-runningJms))
+ for (( c=0; c<missingJms; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ idx=$1
+
+ jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ jm_pids=(${jm_pids[@]})
+
+ pid=${jm_pids[$idx]}
+
+ # kill the JM and wait for the completion of its termination
+ kill -9 ${pid}
+
+ echo "Killed JM @ ${pid}."
+}
+
+function stop_ha_cluster {
+ echo "Tearing down HA Cluster..."
+ stop_cluster
+ stop_local_zk
+ cleanup
+}
+
+function stop_local_zk {
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
--- End diff --
but the assignment should never happen if we enter the `else` branch.
There is another assignment to `server` outside of the condition.
---