You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:28 UTC
[20/55] [abbrv] beam git commit: Fix Spark streaming termination via
waitUntilFinish and timeout config
Fix Spark streaming termination via waitUntilFinish and timeout config
issue #39
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10d5783
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10d5783
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10d5783
Branch: refs/heads/master
Commit: e10d5783d8c8ed32008e29d99d5a4b1dd3e408a6
Parents: 8098bb1
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri Apr 14 17:13:59 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
.../nexmark/NexmarkConfiguration.java | 61 +++++++++++++++++---
.../integration/nexmark/NexmarkOptions.java | 7 +++
.../beam/integration/nexmark/NexmarkRunner.java | 2 +-
3 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index d6cd808..1da08b4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -106,6 +106,12 @@ public class NexmarkConfiguration implements Serializable {
public int preloadSeconds = 0;
/**
+ * Timeout for stream pipelines to stop in seconds.
+ */
+ @JsonProperty
+ public int streamTimeout = 240;
+
+ /**
* If true, and in streaming mode, generate events only when they are due according to their
* timestamp.
*/
@@ -275,6 +281,9 @@ public class NexmarkConfiguration implements Serializable {
if (options.getPreloadSeconds() != null) {
preloadSeconds = options.getPreloadSeconds();
}
+ if (options.getStreamTimeout() != null) {
+ streamTimeout = options.getStreamTimeout();
+ }
if (options.getIsRateLimited() != null) {
isRateLimited = options.getIsRateLimited();
}
@@ -368,6 +377,7 @@ public class NexmarkConfiguration implements Serializable {
result.rateUnit = rateUnit;
result.ratePeriodSec = ratePeriodSec;
result.preloadSeconds = preloadSeconds;
+ result.streamTimeout = streamTimeout;
result.isRateLimited = isRateLimited;
result.useWallclockEventTime = useWallclockEventTime;
result.avgPersonByteSize = avgPersonByteSize;
@@ -436,6 +446,9 @@ public class NexmarkConfiguration implements Serializable {
if (preloadSeconds != DEFAULT.preloadSeconds) {
sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
}
+ if (streamTimeout != DEFAULT.streamTimeout) {
+ sb.append(String.format("; streamTimeout:%d", streamTimeout));
+ }
if (isRateLimited != DEFAULT.isRateLimited) {
sb.append(String.format("; isRateLimited:%s", isRateLimited));
}
@@ -536,13 +549,44 @@ public class NexmarkConfiguration implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(debug, query, sourceType, sinkType, pubSubMode,
- numEvents, numEventGenerators, rateShape, firstEventRate, nextEventRate, rateUnit,
- ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
- avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
- windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
- coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime,
- occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
+ return Objects.hash(
+ debug,
+ query,
+ sourceType,
+ sinkType,
+ pubSubMode,
+ numEvents,
+ numEventGenerators,
+ rateShape,
+ firstEventRate,
+ nextEventRate,
+ rateUnit,
+ ratePeriodSec,
+ preloadSeconds,
+ streamTimeout,
+ isRateLimited,
+ useWallclockEventTime,
+ avgPersonByteSize,
+ avgAuctionByteSize,
+ avgBidByteSize,
+ hotAuctionRatio,
+ hotSellersRatio,
+ hotBiddersRatio,
+ windowSizeSec,
+ windowPeriodSec,
+ watermarkHoldbackSec,
+ numInFlightAuctions,
+ numActivePeople,
+ coderStrategy,
+ cpuDelayMs,
+ diskBusyBytes,
+ auctionSkip,
+ fanout,
+ maxAuctionsWaitingTime,
+ occasionalDelaySec,
+ probDelayedEvent,
+ maxLogEvents,
+ usePubsubPublishTime,
outOfOrderGroupSize);
}
@@ -630,6 +674,9 @@ public class NexmarkConfiguration implements Serializable {
if (preloadSeconds != other.preloadSeconds) {
return false;
}
+ if (streamTimeout != other.streamTimeout) {
+ return false;
+ }
if (Double.doubleToLongBits(probDelayedEvent)
!= Double.doubleToLongBits(other.probDelayedEvent)) {
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index e39f0a4..5d093ae 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -110,6 +110,13 @@ public interface NexmarkOptions extends PubsubOptions {
void setPreloadSeconds(Integer preloadSeconds);
+ @Description(
+ "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode")
+ @Nullable
+ Integer getStreamTimeout();
+
+ void setStreamTimeout(Integer preloadSeconds);
+
@Description("Number of unbounded sources to create events.")
@Nullable
Integer getNumEventGenerators();
http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 3a0452f..ef5f0e2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -1230,7 +1230,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
waitForPublisherPreload();
}
mainResult = p.run();
- mainResult.waitUntilFinish();
+ mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
return monitor(query);
} finally {
//