You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:08:56 UTC

[flink-statefun] 05/17: [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e518ea0717dc54b9e4d6af96aa1839df38f3fa1b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 14 12:50:35 2020 +0800

    [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers
---
 .../e2e/common/StatefulFunctionsAppContainers.java       | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 9894c2d..f03a5de 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -168,6 +168,22 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return master.getMappedPort(8081);
   }
 
+  /**
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
+   */
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
+    }
+
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
   private static File temporaryCheckpointDir() throws IOException {
     final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
     return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();