You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/30 02:11:05 UTC

[beam] branch master updated: Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 93821dd  Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)
93821dd is described below

commit 93821dd2da461d3cf9238888881e182151dc06ca
Author: drieber <dr...@google.com>
AuthorDate: Tue Jan 29 18:10:55 2019 -0800

    Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631)
    
    * Got rid of the WindmillServiceUseStreamingRpcs option from StreamingDataflowWorkerOptions. Streaming engine now always uses streaming RPCs. Appliance never uses streaming RPCs.
    
    * Initial implementation of FakeWindmillServer getWorkStream and commitWorkStream.
    
    * Run ./gradlew spotlessApply
    
    * Fixed tests by implementing FakeWindmillServer.getWorkStream and FakeWindmillServer.commitWorkStream.  It was necessary to add a call to dispatchThread.interrupt from SDW.stop(), otherwise the CommitWorkStream would be stuck forever in closeAfterDefaultTimeout.
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 13 ++-
 .../options/StreamingDataflowWorkerOptions.java    | 19 -----
 .../worker/windmill/WindmillServerStub.java        |  1 +
 .../dataflow/worker/FakeWindmillServer.java        | 98 ++++++++++++++++++++--
 4 files changed, 98 insertions(+), 33 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index fb0e8bb..ab31b9b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -398,8 +398,6 @@ public class StreamingDataflowWorker {
   private final MemoryMonitor memoryMonitor;
   private final Thread memoryMonitorThread;
 
-  private final boolean useStreamingRpcs;
-
   private final WorkerStatusPages statusPages;
   // Periodic sender of debug information to the debug capture service.
   private DebugCapture.Manager debugCaptureManager = null;
@@ -585,7 +583,7 @@ public class StreamingDataflowWorker {
               @Override
               public void run() {
                 LOG.info("Dispatch starting");
-                if (useStreamingRpcs) {
+                if (windmillServiceEnabled) {
                   streamingDispatchLoop();
                 } else {
                   dispatchLoop();
@@ -601,7 +599,7 @@ public class StreamingDataflowWorker {
             new Runnable() {
               @Override
               public void run() {
-                if (useStreamingRpcs) {
+                if (windmillServiceEnabled) {
                   streamingCommitLoop();
                 } else {
                   commitLoop();
@@ -611,11 +609,10 @@ public class StreamingDataflowWorker {
     commitThread.setPriority(Thread.MAX_PRIORITY);
     commitThread.setName("CommitThread");
 
-    this.useStreamingRpcs = options.getWindmillServiceUseStreamingRpcs();
     this.publishCounters = publishCounters;
     this.windmillServer = options.getWindmillServerStub();
     this.metricTrackingWindmillServer =
-        new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, useStreamingRpcs);
+        new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
     this.stateFetcher = new StateFetcher(metricTrackingWindmillServer);
     this.clientId = new Random().nextLong();
 
@@ -660,7 +657,6 @@ public class StreamingDataflowWorker {
     }
 
     LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
-    LOG.debug("useStreamingRpcs: {}", useStreamingRpcs);
     LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
@@ -812,6 +808,7 @@ public class StreamingDataflowWorker {
         debugCaptureManager.stop();
       }
       running.set(false);
+      dispatchThread.interrupt();
       dispatchThread.join();
       // We need to interrupt the commitThread in case it is blocking on pulling
       // from the commitQueue.
@@ -1391,7 +1388,7 @@ public class StreamingDataflowWorker {
       // Batch commits as long as there are more and we can fit them in the current request.
       CommitWorkStream commitStream = streamPool.getStream();
       int commits = 0;
-      while (true) {
+      while (running.get()) {
         // There may be a commit left over from the previous iteration but if not, pull one.
         if (commit == null) {
           try {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
index 20ccc35..a136d7e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
@@ -88,12 +88,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt
 
   void setPeriodicStatusPageOutputDirectory(String directory);
 
-  @Description("If true, will use streaming RPCs with windmill service")
-  @Default.InstanceFactory(WindmillServiceUseStreamingRpcsFactory.class)
-  boolean getWindmillServiceUseStreamingRpcs();
-
-  void setWindmillServiceUseStreamingRpcs(boolean value);
-
   @Description("Streaming requests will be batched into messages up to this limit.")
   @Default.InstanceFactory(WindmillServiceStreamingRpcBatchLimitFactory.class)
   int getWindmillServiceStreamingRpcBatchLimit();
@@ -199,19 +193,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt
     }
   }
 
-  /** Factory for setting value of WindmillServiceUseStreamingRpcs based on environment. */
-  public static class WindmillServiceUseStreamingRpcsFactory
-      implements DefaultValueFactory<Boolean> {
-    @Override
-    public Boolean create(PipelineOptions options) {
-      StreamingDataflowWorkerOptions streamingOptions =
-          options.as(StreamingDataflowWorkerOptions.class);
-      return streamingEngineEnabled(streamingOptions)
-          && hasExperiment(streamingOptions, "windmill_service_streaming_rpcs")
-          && !hasExperiment(streamingOptions, "windmill_service_disable_streaming_rpcs");
-    }
-  }
-
   /** Factory for setting value of WindmillServiceStreamingRpcBatchLimit based on environment. */
   public static class WindmillServiceStreamingRpcBatchLimitFactory
       implements DefaultValueFactory<Integer> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index ce5a058..f14451c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -153,6 +153,7 @@ public abstract class WindmillServerStub implements StatusDataProvider {
     /** Flushes any pending work items to the wire. */
     void flush();
   }
+
   /**
    * Pool of homogeneous streams to Windmill.
    *
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 51327b0..bb46b78 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -31,11 +31,9 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
@@ -48,6 +46,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitR
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.net.HostAndPort;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
+import org.joda.time.Instant;
 import org.junit.rules.ErrorCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@ class FakeWindmillServer extends WindmillServerStub {
 
   private final Queue<Windmill.GetWorkResponse> workToOffer;
   private final Queue<Function<GetDataRequest, GetDataResponse>> dataToOffer;
+  // Keys are work tokens.
   private final Map<Long, WorkItemCommitRequest> commitsReceived;
   private final ArrayList<Windmill.ReportStatsRequest> statsReceived;
   private final LinkedBlockingQueue<Windmill.Exception> exceptions;
@@ -180,7 +180,55 @@ class FakeWindmillServer extends WindmillServerStub {
 
   @Override
   public GetWorkStream getWorkStream(Windmill.GetWorkRequest request, WorkItemReceiver receiver) {
-    throw new UnsupportedOperationException();
+    LOG.debug("getWorkStream: {}", request.toString());
+    Instant startTime = Instant.now();
+    final CountDownLatch done = new CountDownLatch(1);
+    return new GetWorkStream() {
+      @Override
+      public void closeAfterDefaultTimeout() {
+        while (done.getCount() > 0) {
+          Windmill.GetWorkResponse response = workToOffer.poll();
+          if (response == null) {
+            try {
+              sleepMillis(500);
+            } catch (InterruptedException e) {
+              close();
+              Thread.currentThread().interrupt();
+            }
+            continue;
+          }
+          for (Windmill.ComputationWorkItems computationWork : response.getWorkList()) {
+            Instant inputDataWatermark =
+                WindmillTimeUtils.windmillToHarnessWatermark(
+                    computationWork.getInputDataWatermark());
+            for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
+              receiver.receiveWork(
+                  computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void close() {
+        done.countDown();
+      }
+
+      @Override
+      public void awaitTermination() throws InterruptedException {
+        done.await();
+      }
+
+      @Override
+      public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException {
+        return done.await(time, unit);
+      }
+
+      @Override
+      public Instant startTime() {
+        return startTime;
+      }
+    };
   }
 
   @Override
@@ -190,7 +238,45 @@ class FakeWindmillServer extends WindmillServerStub {
 
   @Override
   public CommitWorkStream commitWorkStream() {
-    throw new UnsupportedOperationException();
+    Instant startTime = Instant.now();
+    return new CommitWorkStream() {
+      @Override
+      public boolean commitWorkItem(
+          String computation,
+          WorkItemCommitRequest request,
+          Consumer<Windmill.CommitStatus> onDone) {
+        LOG.debug("commitWorkStream::commitWorkItem: {}", request);
+        errorCollector.checkThat(request.hasWorkToken(), equalTo(true));
+        errorCollector.checkThat(
+            request.getShardingKey(), allOf(greaterThan(0L), lessThan(Long.MAX_VALUE)));
+        errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L)));
+        commitsReceived.put(request.getWorkToken(), request);
+        onDone.accept(Windmill.CommitStatus.OK);
+        return true; // The request was accepted.
+      }
+
+      @Override
+      public void flush() {}
+
+      @Override
+      public void close() {}
+
+      @Override
+      public void awaitTermination() {}
+
+      @Override
+      public boolean awaitTermination(int time, TimeUnit unit) {
+        return true;
+      }
+
+      @Override
+      public void closeAfterDefaultTimeout() {}
+
+      @Override
+      public Instant startTime() {
+        return startTime;
+      }
+    };
   }
 
   public void waitForEmptyWorkQueue() {