You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/03/22 19:37:47 UTC

[GitHub] flink pull request #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMa...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/5750

    [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.

    Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed.
    
    ## Verifying this change
    
    It is a script that you can run independently.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink ha-end-to-end-inv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5750
    
----
commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91
Author: kkloudas <kk...@...>
Date:   2018-03-15T12:13:46Z

    [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.
    
    Adds an end-to-end test that runs the StateMachineExample on a local
    cluster with HA enabled. There is a single JM which gets killed and
    re-created and we check if the new JM picks up the job execution and
    if at the end the StateMachine has no ALERTs printed.

----


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177356523
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
    @@ -105,7 +131,7 @@ public static void main(String[] args) throws Exception {
     				.flatMap(new StateMachineMapper());
     
     		// output the alerts to std-out
    -		alerts.print();
    +		alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE);
    --- End diff --
    
    Do we really need to change this? `test_resume_savepoint.sh` does not change the example.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177354347
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
    @@ -92,7 +96,29 @@ public static void main(String[] args) throws Exception {
     
     		// create the environment to create streams and configure execution
     		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    -		env.enableCheckpointing(5000);
    +
    +		final String checkpointDir = params.getRequired("checkpointDir");
    --- End diff --
    
    In any case you should update the `usage string` above and end to end tests that depend on this test as well.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176725504
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    --- End diff --
    
    function does not reset `./conf/masters` file


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177341094
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,9 +162,42 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
    --- End diff --
    
    Does it makes sense to start multiple job managers with the same `ipPort`?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177342612
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,6 +146,57 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    --- End diff --
    
    We should remove the `wait for completion` comment.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176727208
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,6 +146,57 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    --- End diff --
    
    `kill` and `echo` are not waiting, right?


---

[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5750
  
    Thanks a lot @twalthr for the review!  


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177338397
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,109 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +
    +    # first revert the conf/masters file
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # and then the conf/flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_config() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # clean up the dir that will be used for zookeeper storage
    +    # (see high-availability.zookeeper.storageDir below)
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    +       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
    +       rm -rf $TEST_DATA_DIR/recovery
    +    fi
    +
    +    # then move on to create the flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 4
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # High Availability
    +    #==============================================================================
    +
    +    high-availability: zookeeper
    +    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
    +    high-availability.zookeeper.quorum: localhost:2181
    +    high-availability.zookeeper.path.root: /flink
    +    high-availability.cluster-id: /test_cluster_one
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function start_ha_cluster {
    +    echo "Setting up HA Cluster..."
    +    create_ha_config
    +    start_local_zk
    +    start_cluster
    +}
    +
    +function start_local_zk {
    +    # Parses the zoo.cfg and starts locally zk.
    +
    +    # This is almost the same code as the
    +    # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
    --- End diff --
    
    Shouldn't the 'end-to-end tests' test 'end-to-end' also to test our scripts? This way we do not test what a user would use. 


---

[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:

    https://github.com/apache/flink/pull/5750
  
    Based on the umbrella task link FLINK-8970, it seems like this e2e test should be attached to FLINK-8973 instead?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176734645
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    +    fi
    +
    +    # checks that all apart from the first JM recover the failes jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        PASS=""
    +    fi
    +
    +    # search the logs for JMs that log completed checkpoints
    +    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
    +        echo "FAILURE: A JM did not execute the job."
    +        PASS=""
    +    fi
    +}
    +
    +run_ha_test() {
    +    parallelism=$1
    +    backend=$2
    +    async=$3
    +    incremental=$4
    +    maxAttempts=$5
    +    rstrtInterval=$6
    +    output=$7
    +
    +    jmKillAndRetries=2
    +    checkpointDir="${TEST_DATA_DIR}/checkpoints/"
    +
    +    # start the cluster on HA mode and
    +    # verify that all JMs are running
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
    +
    +    # submit a job in detached mode and let it run
    +    $FLINK_DIR/bin/flink run -d -p ${parallelism} \
    +     $TEST_PROGRAM_JAR \
    +        --stateBackend ${backend} \
    +        --checkpointDir "file://${checkpointDir}" \
    +        --asyncCheckpoints ${async} \
    +        --incrementalCheckpoints ${incremental} \
    +        --restartAttempts ${maxAttempts} \
    +        --restartDelay ${rstrtInterval} \
    +        --output ${output} > /dev/null
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    watchdogPid=$!
    +
    +    # let the job run for a while to take some checkpoints
    +    sleep 50
    +
    +    for (( c=0; c<${jmKillAndRetries}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new JM which will take over
    +        kill_jm 0
    +        sleep 50
    +    done
    +
    +    verify_logs ${jmKillAndRetries}
    --- End diff --
    
    Will the test fail if there are errors or will there be just error messages printed?
    I don't see an `exit` or something similar


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176766535
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    +    fi
    +
    +    # checks that all apart from the first JM recover the failes jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        PASS=""
    +    fi
    +
    +    # search the logs for JMs that log completed checkpoints
    +    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
    +        echo "FAILURE: A JM did not execute the job."
    +        PASS=""
    +    fi
    +}
    +
    +run_ha_test() {
    +    parallelism=$1
    +    backend=$2
    +    async=$3
    +    incremental=$4
    +    maxAttempts=$5
    +    rstrtInterval=$6
    +    output=$7
    +
    +    jmKillAndRetries=2
    +    checkpointDir="${TEST_DATA_DIR}/checkpoints/"
    +
    +    # start the cluster on HA mode and
    +    # verify that all JMs are running
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
    +
    +    # submit a job in detached mode and let it run
    +    $FLINK_DIR/bin/flink run -d -p ${parallelism} \
    +     $TEST_PROGRAM_JAR \
    +        --stateBackend ${backend} \
    +        --checkpointDir "file://${checkpointDir}" \
    +        --asyncCheckpoints ${async} \
    +        --incrementalCheckpoints ${incremental} \
    +        --restartAttempts ${maxAttempts} \
    +        --restartDelay ${rstrtInterval} \
    +        --output ${output} > /dev/null
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    watchdogPid=$!
    +
    +    # let the job run for a while to take some checkpoints
    +    sleep 50
    +
    +    for (( c=0; c<${jmKillAndRetries}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new JM which will take over
    +        kill_jm 0
    --- End diff --
    
    OK :+1:


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176727554
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,6 +146,57 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    +    kill -9 ${pid}
    +
    +    echo "Killed JM @ ${pid}."
    +}
    +
    +function stop_ha_cluster {
    +    echo "Tearing down HA Cluster..."
    +    stop_cluster
    +    stop_local_zk
    +    cleanup
    +}
    +
    +function stop_local_zk {
    +    while read server ; do
    +        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
    +
    +        # match server.id=address[:port[:port]]
    +        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
    +            id=${BASH_REMATCH[1]}
    --- End diff --
    
    `id` doesn't seem to be used
    `server` doesn't seem to be used (or is it overriding the outer `server` variable`?)


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176732031
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    +    fi
    +
    +    # checks that all apart from the first JM recover the failes jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        PASS=""
    +    fi
    +
    +    # search the logs for JMs that log completed checkpoints
    +    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
    +        echo "FAILURE: A JM did not execute the job."
    +        PASS=""
    +    fi
    +}
    +
    +run_ha_test() {
    +    parallelism=$1
    +    backend=$2
    +    async=$3
    +    incremental=$4
    +    maxAttempts=$5
    +    rstrtInterval=$6
    +    output=$7
    +
    +    jmKillAndRetries=2
    +    checkpointDir="${TEST_DATA_DIR}/checkpoints/"
    +
    +    # start the cluster on HA mode and
    +    # verify that all JMs are running
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
    +
    +    # submit a job in detached mode and let it run
    +    $FLINK_DIR/bin/flink run -d -p ${parallelism} \
    +     $TEST_PROGRAM_JAR \
    +        --stateBackend ${backend} \
    +        --checkpointDir "file://${checkpointDir}" \
    +        --asyncCheckpoints ${async} \
    +        --incrementalCheckpoints ${incremental} \
    +        --restartAttempts ${maxAttempts} \
    +        --restartDelay ${rstrtInterval} \
    +        --output ${output} > /dev/null
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    watchdogPid=$!
    +
    +    # let the job run for a while to take some checkpoints
    +    sleep 50
    +
    +    for (( c=0; c<${jmKillAndRetries}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new JM which will take over
    +        kill_jm 0
    +        sleep 50
    +    done
    +
    +    verify_logs ${jmKillAndRetries}
    +
    +    # kill the cluster and zookeeper
    +    stop_cluster_and_watchdog
    +}
    +
    +run_ha_test 1 "file" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
    +run_ha_test 1 "rocks" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
    +run_ha_test 1 "file" "true" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
    +run_ha_test 1 "rocks" "false" "true" 3 100 "${TEST_DATA_DIR}/output.txt"
    +trap stop_cluster_and_watchdog EXIT
    --- End diff --
    
    I don't have much experience with `trap` but the examples I've seen run it before the code that it should guard and not at the end of the script when there's nothing left to do.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176725740
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_conf() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # then move on to create the flink-conf.yaml
    +
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    --- End diff --
    
    add a comment


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176732649
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    --- End diff --
    
    What is `PASS` being used for?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176765903
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    +    fi
    +
    +    # checks that all apart from the first JM recover the failes jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        PASS=""
    +    fi
    +
    +    # search the logs for JMs that log completed checkpoints
    +    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
    +        echo "FAILURE: A JM did not execute the job."
    +        PASS=""
    +    fi
    +}
    +
    +run_ha_test() {
    +    parallelism=$1
    +    backend=$2
    +    async=$3
    +    incremental=$4
    +    maxAttempts=$5
    +    rstrtInterval=$6
    +    output=$7
    +
    +    jmKillAndRetries=2
    +    checkpointDir="${TEST_DATA_DIR}/checkpoints/"
    +
    +    # start the cluster on HA mode and
    +    # verify that all JMs are running
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
    +
    +    # submit a job in detached mode and let it run
    +    $FLINK_DIR/bin/flink run -d -p ${parallelism} \
    +     $TEST_PROGRAM_JAR \
    +        --stateBackend ${backend} \
    +        --checkpointDir "file://${checkpointDir}" \
    +        --asyncCheckpoints ${async} \
    +        --incrementalCheckpoints ${incremental} \
    +        --restartAttempts ${maxAttempts} \
    +        --restartDelay ${rstrtInterval} \
    +        --output ${output} > /dev/null
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    watchdogPid=$!
    +
    +    # let the job run for a while to take some checkpoints
    +    sleep 50
    +
    +    for (( c=0; c<${jmKillAndRetries}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new JM which will take over
    +        kill_jm 0
    --- End diff --
    
    There is only one running at any point in time.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177343482
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,9 +162,42 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    +    kill -9 ${pid}
    +
    +    echo "Killed JM @ ${pid}."
    +}
    +
     function stop_cluster {
       "$FLINK_DIR"/bin/stop-cluster.sh
     
    +  # stop zookeeper only if there are processes running
    --- End diff --
    
    I would move this to a `stop_ha_cluster` function. Btw we also don't call `stop-zookeeper-quorum.sh`.


---

[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5750
  
    Thank you @kl0u. I found a little bug that I will fix. Merging...


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176730412
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    +    fi
    +
    +    # checks that all apart from the first JM recover the failes jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        PASS=""
    +    fi
    +
    +    # search the logs for JMs that log completed checkpoints
    +    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then
    +        echo "FAILURE: A JM did not execute the job."
    +        PASS=""
    +    fi
    +}
    +
    +run_ha_test() {
    +    parallelism=$1
    +    backend=$2
    +    async=$3
    +    incremental=$4
    +    maxAttempts=$5
    +    rstrtInterval=$6
    +    output=$7
    +
    +    jmKillAndRetries=2
    +    checkpointDir="${TEST_DATA_DIR}/checkpoints/"
    +
    +    # start the cluster on HA mode and
    +    # verify that all JMs are running
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}."
    +
    +    # submit a job in detached mode and let it run
    +    $FLINK_DIR/bin/flink run -d -p ${parallelism} \
    +     $TEST_PROGRAM_JAR \
    +        --stateBackend ${backend} \
    +        --checkpointDir "file://${checkpointDir}" \
    +        --asyncCheckpoints ${async} \
    +        --incrementalCheckpoints ${incremental} \
    +        --restartAttempts ${maxAttempts} \
    +        --restartDelay ${rstrtInterval} \
    +        --output ${output} > /dev/null
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    watchdogPid=$!
    +
    +    # let the job run for a while to take some checkpoints
    +    sleep 50
    +
    +    for (( c=0; c<${jmKillAndRetries}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new JM which will take over
    +        kill_jm 0
    --- End diff --
    
    Are you sure that this will kill the JM that is in charge of running the job?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177344409
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,119 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +watch_dog_pid=0
    +
    +stop_cluster_and_watchdog() {
    --- End diff --
    
    We should use a consistent coding style. Add `function` keywords?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5750


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177436350
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,9 +162,42 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${ipPort}
    --- End diff --
    
    Actually we launch only one, and we kill it and then start a new one.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176725241
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_conf() {
    --- End diff --
    
    `conf` or `config`? Keep in sync with `revert_default_config()`


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176761099
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    --- End diff --
    
    yes, this is because we do not change it here. But I will also change it for future safety.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177344142
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,119 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    --- End diff --
    
    Use `$FLINK_DIR/examples/streaming` instead?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177340291
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,109 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +
    +    # first revert the conf/masters file
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # and then the conf/flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_config() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # clean up the dir that will be used for zookeeper storage
    +    # (see high-availability.zookeeper.storageDir below)
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    +       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
    +       rm -rf $TEST_DATA_DIR/recovery
    +    fi
    +
    +    # then move on to create the flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 4
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # High Availability
    +    #==============================================================================
    +
    +    high-availability: zookeeper
    +    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
    +    high-availability.zookeeper.quorum: localhost:2181
    +    high-availability.zookeeper.path.root: /flink
    +    high-availability.cluster-id: /test_cluster_one
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function start_ha_cluster {
    +    echo "Setting up HA Cluster..."
    +    create_ha_config
    +    start_local_zk
    +    start_cluster
    +}
    +
    +function start_local_zk {
    +    # Parses the zoo.cfg and starts locally zk.
    +
    +    # This is almost the same code as the
    +    # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
    +
    +    while read server ; do
    +        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
    +
    +        # match server.id=address[:port[:port]]
    +        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
    +            id=${BASH_REMATCH[1]}
    +            address=${BASH_REMATCH[2]}
    +
    +            if [ "${address}" != "localhost" ]; then
    +                echo "[WARN] Parse error. Only available for localhost."
    --- End diff --
    
    `[ERROR]` instead of `[WARN]`? Because we fail hard here right?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176770591
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,108 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
    +
    +stop_cluster_and_watchdog() {
    +    kill ${watchdogPid} 2> /dev/null
    +    wait ${watchdogPid} 2> /dev/null
    +
    +    stop_ha_cluster
    +}
    +
    +verify_logs() {
    +    expectedRetries=$1
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${output} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
    +        PASS=""
    --- End diff --
    
    It is used in the `check_all_pass` in the `common.sh`. Kind of obscure, but this is the structure for now...


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177353937
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---
    @@ -92,7 +96,29 @@ public static void main(String[] args) throws Exception {
     
     		// create the environment to create streams and configure execution
     		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    -		env.enableCheckpointing(5000);
    +
    +		final String checkpointDir = params.getRequired("checkpointDir");
    --- End diff --
    
    We could do this also via the `flink-conf.yaml` and don't make the parameters of the example too complicated.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177335437
  
    --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
    @@ -93,6 +93,13 @@ if [ $EXIT_CODE == 0 ]; then
         EXIT_CODE=$?
     fi
     
    +#if [ $EXIT_CODE == 0 ]; then
    --- End diff --
    
    Remove this. This test is too complicated for pre-commit tests.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176725711
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_conf() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # then move on to create the flink-conf.yaml
    --- End diff --
    
    move comment down to `sed 's/^     //g' ...`


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176762986
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,6 +146,57 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    +    kill -9 ${pid}
    +
    +    echo "Killed JM @ ${pid}."
    +}
    +
    +function stop_ha_cluster {
    +    echo "Tearing down HA Cluster..."
    +    stop_cluster
    +    stop_local_zk
    +    cleanup
    +}
    +
    +function stop_local_zk {
    +    while read server ; do
    +        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
    +
    +        # match server.id=address[:port[:port]]
    +        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
    +            id=${BASH_REMATCH[1]}
    --- End diff --
    
    server is used in the warning message.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177364376
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,109 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +
    +    # first revert the conf/masters file
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # and then the conf/flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_config() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # clean up the dir that will be used for zookeeper storage
    +    # (see high-availability.zookeeper.storageDir below)
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    +       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
    +       rm -rf $TEST_DATA_DIR/recovery
    +    fi
    +
    +    # then move on to create the flink-conf.yaml
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 4
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # High Availability
    +    #==============================================================================
    +
    +    high-availability: zookeeper
    +    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
    +    high-availability.zookeeper.quorum: localhost:2181
    +    high-availability.zookeeper.path.root: /flink
    +    high-availability.cluster-id: /test_cluster_one
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function start_ha_cluster {
    +    echo "Setting up HA Cluster..."
    +    create_ha_config
    +    start_local_zk
    +    start_cluster
    +}
    +
    +function start_local_zk {
    +    # Parses the zoo.cfg and starts locally zk.
    +
    +    # This is almost the same code as the
    +    # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
    --- End diff --
    
    The problem is that the `zookeeper` script does not take into account that the fact that we may want to launch `zookeeper` locally, so it always asks for `SSH` credentials.


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176726476
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_conf() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # then move on to create the flink-conf.yaml
    +
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    +       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
    +       rm -rf $TEST_DATA_DIR/recovery
    +    fi
    +
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 4
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # High Availability
    +    #==============================================================================
    +
    +    high-availability: zookeeper
    +    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
    +    high-availability.zookeeper.quorum: localhost:2181
    +    high-availability.zookeeper.path.root: /flink
    +    high-availability.cluster-id: /test_cluster_one
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function start_ha_cluster {
    +    echo "Setting up HA Cluster..."
    +    create_ha_conf
    +    start_local_zk
    +    start_cluster
    +}
    +
    +function start_local_zk {
    +    while read server ; do
    +        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
    +
    +        # match server.id=address[:port[:port]]
    +        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
    +            id=${BASH_REMATCH[1]}
    +            address=${BASH_REMATCH[2]}
    --- End diff --
    
    `address` seems to be unused


---

[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5750
  
    Hi @walterddr, I addressed most of your comments. Feel free to have another look.



---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r177343812
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
    @@ -0,0 +1,119 @@
    +#!/usr/bin/env bash
    +
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target flink-end-to-end-tests/test-scripts/test_ha.sh
    --- End diff --
    
    Remove this?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176725915
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -39,6 +39,93 @@ cd $TEST_ROOT
     export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
     echo "TEST_DATA_DIR: $TEST_DATA_DIR"
     
    +function revert_default_config() {
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 1
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function create_ha_conf() {
    +
    +    # create the masters file (only one currently).
    +    # This must have all the masters to be used in HA.
    +    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
    +
    +    # then move on to create the flink-conf.yaml
    +
    +    if [ -e $TEST_DATA_DIR/recovery ]; then
    +       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
    +       rm -rf $TEST_DATA_DIR/recovery
    +    fi
    +
    +    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
    +    #==============================================================================
    +    # Common
    +    #==============================================================================
    +
    +    jobmanager.rpc.address: localhost
    +    jobmanager.rpc.port: 6123
    +    jobmanager.heap.mb: 1024
    +    taskmanager.heap.mb: 1024
    +    taskmanager.numberOfTaskSlots: 4
    +    parallelism.default: 1
    +
    +    #==============================================================================
    +    # High Availability
    +    #==============================================================================
    +
    +    high-availability: zookeeper
    +    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
    +    high-availability.zookeeper.quorum: localhost:2181
    +    high-availability.zookeeper.path.root: /flink
    +    high-availability.cluster-id: /test_cluster_one
    +
    +    #==============================================================================
    +    # Web Frontend
    +    #==============================================================================
    +
    +    web.port: 8081
    +EOL
    +}
    +
    +function start_ha_cluster {
    +    echo "Setting up HA Cluster..."
    +    create_ha_conf
    +    start_local_zk
    +    start_cluster
    +}
    +
    +function start_local_zk {
    --- End diff --
    
    add a few comments to this method?


---

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5750#discussion_r176767031
  
    --- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
    @@ -59,6 +146,57 @@ function start_cluster {
       done
     }
     
    +function jm_watchdog() {
    +    expectedJms=$1
    +    ipPort=$2
    +
    +    while true; do
    +        runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`;
    +        missingJms=$((expectedJms-runningJms))
    +        for (( c=0; c<missingJms; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" $2
    +        done
    +        sleep 5;
    +    done
    +}
    +
    +function kill_jm {
    +    idx=$1
    +
    +    jm_pids=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
    +    jm_pids=(${jm_pids[@]})
    +
    +    pid=${jm_pids[$idx]}
    +
    +    # kill the JM and wait for the completion of its termination
    +    kill -9 ${pid}
    +
    +    echo "Killed JM @ ${pid}."
    +}
    +
    +function stop_ha_cluster {
    +    echo "Tearing down HA Cluster..."
    +    stop_cluster
    +    stop_local_zk
    +    cleanup
    +}
    +
    +function stop_local_zk {
    +    while read server ; do
    +        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
    +
    +        # match server.id=address[:port[:port]]
    +        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
    +            id=${BASH_REMATCH[1]}
    --- End diff --
    
    but the assignment should never happen if we enter the `else` branch.
    There is another assignment to `server` outside of the condition. 


---