You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/30 02:11:05 UTC
[beam] branch master updated: Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 93821dd Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)
93821dd is described below
commit 93821dd2da461d3cf9238888881e182151dc06ca
Author: drieber <dr...@google.com>
AuthorDate: Tue Jan 29 18:10:55 2019 -0800
Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)
* Got rid of the WindmillServiceUseStreamingRpcs option from StreamingDataflowWorkerOptions. Streaming engine now always uses streaming RPCs. Appliance never uses streaming RPCs.
* Initial implementation of FakeWindmillServer getWorkStream and commitWorkStream.
* Run ./gradlew spotlessApply
* Fixed tests by implementing FakeWindmillServer.getWorkStream and FakeWindmillServer.commitWorkStream. It was necessary to add a call to dispatchThread.interrupt from SDW.stop(), otherwise the CommitWorkStream would be stuck forever in closeAfterDefaultTimeout.
---
.../dataflow/worker/StreamingDataflowWorker.java | 13 ++-
.../options/StreamingDataflowWorkerOptions.java | 19 -----
.../worker/windmill/WindmillServerStub.java | 1 +
.../dataflow/worker/FakeWindmillServer.java | 98 ++++++++++++++++++++--
4 files changed, 98 insertions(+), 33 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index fb0e8bb..ab31b9b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -398,8 +398,6 @@ public class StreamingDataflowWorker {
private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;
- private final boolean useStreamingRpcs;
-
private final WorkerStatusPages statusPages;
// Periodic sender of debug information to the debug capture service.
private DebugCapture.Manager debugCaptureManager = null;
@@ -585,7 +583,7 @@ public class StreamingDataflowWorker {
@Override
public void run() {
LOG.info("Dispatch starting");
- if (useStreamingRpcs) {
+ if (windmillServiceEnabled) {
streamingDispatchLoop();
} else {
dispatchLoop();
@@ -601,7 +599,7 @@ public class StreamingDataflowWorker {
new Runnable() {
@Override
public void run() {
- if (useStreamingRpcs) {
+ if (windmillServiceEnabled) {
streamingCommitLoop();
} else {
commitLoop();
@@ -611,11 +609,10 @@ public class StreamingDataflowWorker {
commitThread.setPriority(Thread.MAX_PRIORITY);
commitThread.setName("CommitThread");
- this.useStreamingRpcs = options.getWindmillServiceUseStreamingRpcs();
this.publishCounters = publishCounters;
this.windmillServer = options.getWindmillServerStub();
this.metricTrackingWindmillServer =
- new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, useStreamingRpcs);
+ new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
this.stateFetcher = new StateFetcher(metricTrackingWindmillServer);
this.clientId = new Random().nextLong();
@@ -660,7 +657,6 @@ public class StreamingDataflowWorker {
}
LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
- LOG.debug("useStreamingRpcs: {}", useStreamingRpcs);
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
@@ -812,6 +808,7 @@ public class StreamingDataflowWorker {
debugCaptureManager.stop();
}
running.set(false);
+ dispatchThread.interrupt();
dispatchThread.join();
// We need to interrupt the commitThread in case it is blocking on pulling
// from the commitQueue.
@@ -1391,7 +1388,7 @@ public class StreamingDataflowWorker {
// Batch commits as long as there are more and we can fit them in the current request.
CommitWorkStream commitStream = streamPool.getStream();
int commits = 0;
- while (true) {
+ while (running.get()) {
// There may be a commit left over from the previous iteration but if not, pull one.
if (commit == null) {
try {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
index 20ccc35..a136d7e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
@@ -88,12 +88,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt
void setPeriodicStatusPageOutputDirectory(String directory);
- @Description("If true, will use streaming RPCs with windmill service")
- @Default.InstanceFactory(WindmillServiceUseStreamingRpcsFactory.class)
- boolean getWindmillServiceUseStreamingRpcs();
-
- void setWindmillServiceUseStreamingRpcs(boolean value);
-
@Description("Streaming requests will be batched into messages up to this limit.")
@Default.InstanceFactory(WindmillServiceStreamingRpcBatchLimitFactory.class)
int getWindmillServiceStreamingRpcBatchLimit();
@@ -199,19 +193,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt
}
}
- /** Factory for setting value of WindmillServiceUseStreamingRpcs based on environment. */
- public static class WindmillServiceUseStreamingRpcsFactory
- implements DefaultValueFactory<Boolean> {
- @Override
- public Boolean create(PipelineOptions options) {
- StreamingDataflowWorkerOptions streamingOptions =
- options.as(StreamingDataflowWorkerOptions.class);
- return streamingEngineEnabled(streamingOptions)
- && hasExperiment(streamingOptions, "windmill_service_streaming_rpcs")
- && !hasExperiment(streamingOptions, "windmill_service_disable_streaming_rpcs");
- }
- }
-
/** Factory for setting value of WindmillServiceStreamingRpcBatchLimit based on environment. */
public static class WindmillServiceStreamingRpcBatchLimitFactory
implements DefaultValueFactory<Integer> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index ce5a058..f14451c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -153,6 +153,7 @@ public abstract class WindmillServerStub implements StatusDataProvider {
/** Flushes any pending work items to the wire. */
void flush();
}
+
/**
* Pool of homogeneous streams to Windmill.
*
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 51327b0..bb46b78 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -31,11 +31,9 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
@@ -48,6 +46,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitR
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.vendor.guava.v20_0.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
+import org.joda.time.Instant;
import org.junit.rules.ErrorCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@ class FakeWindmillServer extends WindmillServerStub {
private final Queue<Windmill.GetWorkResponse> workToOffer;
private final Queue<Function<GetDataRequest, GetDataResponse>> dataToOffer;
+ // Keys are work tokens.
private final Map<Long, WorkItemCommitRequest> commitsReceived;
private final ArrayList<Windmill.ReportStatsRequest> statsReceived;
private final LinkedBlockingQueue<Windmill.Exception> exceptions;
@@ -180,7 +180,55 @@ class FakeWindmillServer extends WindmillServerStub {
@Override
public GetWorkStream getWorkStream(Windmill.GetWorkRequest request, WorkItemReceiver receiver) {
- throw new UnsupportedOperationException();
+ LOG.debug("getWorkStream: {}", request.toString());
+ Instant startTime = Instant.now();
+ final CountDownLatch done = new CountDownLatch(1);
+ return new GetWorkStream() {
+ @Override
+ public void closeAfterDefaultTimeout() {
+ while (done.getCount() > 0) {
+ Windmill.GetWorkResponse response = workToOffer.poll();
+ if (response == null) {
+ try {
+ sleepMillis(500);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+ for (Windmill.ComputationWorkItems computationWork : response.getWorkList()) {
+ Instant inputDataWatermark =
+ WindmillTimeUtils.windmillToHarnessWatermark(
+ computationWork.getInputDataWatermark());
+ for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
+ receiver.receiveWork(
+ computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ done.countDown();
+ }
+
+ @Override
+ public void awaitTermination() throws InterruptedException {
+ done.await();
+ }
+
+ @Override
+ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException {
+ return done.await(time, unit);
+ }
+
+ @Override
+ public Instant startTime() {
+ return startTime;
+ }
+ };
}
@Override
@@ -190,7 +238,45 @@ class FakeWindmillServer extends WindmillServerStub {
@Override
public CommitWorkStream commitWorkStream() {
- throw new UnsupportedOperationException();
+ Instant startTime = Instant.now();
+ return new CommitWorkStream() {
+ @Override
+ public boolean commitWorkItem(
+ String computation,
+ WorkItemCommitRequest request,
+ Consumer<Windmill.CommitStatus> onDone) {
+ LOG.debug("commitWorkStream::commitWorkItem: {}", request);
+ errorCollector.checkThat(request.hasWorkToken(), equalTo(true));
+ errorCollector.checkThat(
+ request.getShardingKey(), allOf(greaterThan(0L), lessThan(Long.MAX_VALUE)));
+ errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L)));
+ commitsReceived.put(request.getWorkToken(), request);
+ onDone.accept(Windmill.CommitStatus.OK);
+ return true; // The request was accepted.
+ }
+
+ @Override
+ public void flush() {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void awaitTermination() {}
+
+ @Override
+ public boolean awaitTermination(int time, TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ public void closeAfterDefaultTimeout() {}
+
+ @Override
+ public Instant startTime() {
+ return startTime;
+ }
+ };
}
public void waitForEmptyWorkQueue() {