You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:28 UTC

[20/55] [abbrv] beam git commit: Fix Spark streaming termination via waitUntilFinish and timeout config

Fix Spark streaming termination via waitUntilFinish and timeout config

issue #39


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10d5783
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10d5783
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10d5783

Branch: refs/heads/master
Commit: e10d5783d8c8ed32008e29d99d5a4b1dd3e408a6
Parents: 8098bb1
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri Apr 14 17:13:59 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../nexmark/NexmarkConfiguration.java           | 61 +++++++++++++++++---
 .../integration/nexmark/NexmarkOptions.java     |  7 +++
 .../beam/integration/nexmark/NexmarkRunner.java |  2 +-
 3 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index d6cd808..1da08b4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -106,6 +106,12 @@ public class NexmarkConfiguration implements Serializable {
   public int preloadSeconds = 0;
 
   /**
+   * Timeout for stream pipelines to stop in seconds.
+   */
+  @JsonProperty
+  public int streamTimeout = 240;
+
+  /**
    * If true, and in streaming mode, generate events only when they are due according to their
    * timestamp.
    */
@@ -275,6 +281,9 @@ public class NexmarkConfiguration implements Serializable {
     if (options.getPreloadSeconds() != null) {
       preloadSeconds = options.getPreloadSeconds();
     }
+    if (options.getStreamTimeout() != null) {
+      streamTimeout = options.getStreamTimeout();
+    }
     if (options.getIsRateLimited() != null) {
       isRateLimited = options.getIsRateLimited();
     }
@@ -368,6 +377,7 @@ public class NexmarkConfiguration implements Serializable {
     result.rateUnit = rateUnit;
     result.ratePeriodSec = ratePeriodSec;
     result.preloadSeconds = preloadSeconds;
+    result.streamTimeout = streamTimeout;
     result.isRateLimited = isRateLimited;
     result.useWallclockEventTime = useWallclockEventTime;
     result.avgPersonByteSize = avgPersonByteSize;
@@ -436,6 +446,9 @@ public class NexmarkConfiguration implements Serializable {
     if (preloadSeconds != DEFAULT.preloadSeconds) {
       sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
     }
+    if (streamTimeout != DEFAULT.streamTimeout) {
+      sb.append(String.format("; streamTimeout:%d", streamTimeout));
+    }
     if (isRateLimited != DEFAULT.isRateLimited) {
       sb.append(String.format("; isRateLimited:%s", isRateLimited));
     }
@@ -536,13 +549,44 @@ public class NexmarkConfiguration implements Serializable {
 
   @Override
   public int hashCode() {
-    return Objects.hash(debug, query, sourceType, sinkType, pubSubMode,
-        numEvents, numEventGenerators, rateShape, firstEventRate, nextEventRate, rateUnit,
-        ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
-        avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
-        windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
-        coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime,
-        occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
+    return Objects.hash(
+        debug,
+        query,
+        sourceType,
+        sinkType,
+        pubSubMode,
+        numEvents,
+        numEventGenerators,
+        rateShape,
+        firstEventRate,
+        nextEventRate,
+        rateUnit,
+        ratePeriodSec,
+        preloadSeconds,
+        streamTimeout,
+        isRateLimited,
+        useWallclockEventTime,
+        avgPersonByteSize,
+        avgAuctionByteSize,
+        avgBidByteSize,
+        hotAuctionRatio,
+        hotSellersRatio,
+        hotBiddersRatio,
+        windowSizeSec,
+        windowPeriodSec,
+        watermarkHoldbackSec,
+        numInFlightAuctions,
+        numActivePeople,
+        coderStrategy,
+        cpuDelayMs,
+        diskBusyBytes,
+        auctionSkip,
+        fanout,
+        maxAuctionsWaitingTime,
+        occasionalDelaySec,
+        probDelayedEvent,
+        maxLogEvents,
+        usePubsubPublishTime,
         outOfOrderGroupSize);
   }
 
@@ -630,6 +674,9 @@ public class NexmarkConfiguration implements Serializable {
     if (preloadSeconds != other.preloadSeconds) {
       return false;
     }
+    if (streamTimeout != other.streamTimeout) {
+      return false;
+    }
     if (Double.doubleToLongBits(probDelayedEvent)
         != Double.doubleToLongBits(other.probDelayedEvent)) {
       return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index e39f0a4..5d093ae 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -110,6 +110,13 @@ public interface NexmarkOptions extends PubsubOptions {
 
   void setPreloadSeconds(Integer preloadSeconds);
 
+  @Description(
+      "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode")
+  @Nullable
+  Integer getStreamTimeout();
+
+  void setStreamTimeout(Integer preloadSeconds);
+
   @Description("Number of unbounded sources to create events.")
   @Nullable
   Integer getNumEventGenerators();

http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 3a0452f..ef5f0e2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -1230,7 +1230,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
         waitForPublisherPreload();
       }
       mainResult = p.run();
-      mainResult.waitUntilFinish();
+      mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
       return monitor(query);
     } finally {
       //