You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/01 18:58:15 UTC

[beam] branch master updated: Add nexmark option to allow cancel streaming query job after complete

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78be15f  Add nexmark option to allow cancel streaming query job after complete
     new ea928a2  Merge pull request #12119 from y1chi/nexmark
78be15f is described below

commit 78be15f5efe11d9dd0476d181430b55569caccd0
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Mon Jun 29 11:25:35 2020 -0700

    Add nexmark option to allow cancel streaming query job after complete
---
 .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java  | 3 +++
 .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java   | 6 ++++++
 2 files changed, 9 insertions(+)

diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 66385af..619a517 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -480,6 +480,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
             && quietFor.isLongerThan(DONE_DELAY)) {
           NexmarkUtils.console("streaming query appears to have finished waiting for completion.");
           waitingForShutdown = true;
+          if (options.getCancelStreamingJobAfterFinish()) {
+            cancelJob = true;
+          }
         } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
           NexmarkUtils.console(
               "ERROR: streaming query appears to have been stuck for %d minutes, cancelling job.",
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index db9b97e..ea2be42 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -139,6 +139,12 @@ public interface NexmarkOptions
 
   void setStreamTimeout(Integer streamTimeout);
 
+  @Description("Proactively cancels streaming job after query is completed")
+  @Default.Boolean(false)
+  boolean getCancelStreamingJobAfterFinish();
+
+  void setCancelStreamingJobAfterFinish(boolean cancelStreamingJobAfterFinish);
+
   @Description("Number of unbounded sources to create events.")
   @Nullable
   Integer getNumEventGenerators();