You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/01 18:58:15 UTC
[beam] branch master updated: Add nexmark option to allow cancel
streaming query job after complete
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78be15f Add nexmark option to allow cancel streaming query job after complete
new ea928a2 Merge pull request #12119 from y1chi/nexmark
78be15f is described below
commit 78be15f5efe11d9dd0476d181430b55569caccd0
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Mon Jun 29 11:25:35 2020 -0700
Add nexmark option to allow cancel streaming query job after complete
---
.../src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 3 +++
.../src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java | 6 ++++++
2 files changed, 9 insertions(+)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 66385af..619a517 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -480,6 +480,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
&& quietFor.isLongerThan(DONE_DELAY)) {
NexmarkUtils.console("streaming query appears to have finished waiting for completion.");
waitingForShutdown = true;
+ if (options.getCancelStreamingJobAfterFinish()) {
+ cancelJob = true;
+ }
} else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
NexmarkUtils.console(
"ERROR: streaming query appears to have been stuck for %d minutes, cancelling job.",
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index db9b97e..ea2be42 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -139,6 +139,12 @@ public interface NexmarkOptions
void setStreamTimeout(Integer streamTimeout);
+ @Description("Proactively cancels streaming job after query is completed")
+ @Default.Boolean(false)
+ boolean getCancelStreamingJobAfterFinish();
+
+ void setCancelStreamingJobAfterFinish(boolean cancelStreamingJobAfterFinish);
+
@Description("Number of unbounded sources to create events.")
@Nullable
Integer getNumEventGenerators();