You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/03/05 18:13:42 UTC

[beam] branch master updated: [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient.

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccdd32  [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient.
     new 1107fc1  Merge pull request #7895: [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient
5ccdd32 is described below

commit 5ccdd322975b6d63197b976a60f0f6a1c34c181d
Author: Alex Amato <aj...@google.com>
AuthorDate: Tue Feb 19 11:25:04 2019 -0800

    [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue
    in QueueingBeamFnDataClient.
---
 .../sdk/fn/data/CompletableFutureInboundDataClient.java    |  4 +++-
 .../beam/fn/harness/data/QueueingBeamFnDataClient.java     | 14 ++++++++++----
 .../beam/fn/harness/data/QueueingBeamFnDataClientTest.java | 12 ++++++------
 3 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
index 463a168..5f02e6e 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
@@ -65,6 +65,8 @@ public class CompletableFutureInboundDataClient implements InboundDataClient {
 
   @Override
   public void fail(Throwable t) {
-    future.completeExceptionally(t);
+    // Use obtrudeException instead of CompleteExceptionally, forcing any future calls to .get()
+    // to raise the execption, even if the future is already compelted.
+    future.obtrudeException(t);
   }
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
index 194672d..ebcb903 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
@@ -18,7 +18,7 @@
 package org.apache.beam.fn.harness.data;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -39,15 +39,17 @@ import org.slf4j.LoggerFactory;
  */
 public class QueueingBeamFnDataClient implements BeamFnDataClient {
 
+  private static final int QUEUE_SIZE = 1000;
+
   private static final Logger LOG = LoggerFactory.getLogger(QueueingBeamFnDataClient.class);
 
   private final BeamFnDataClient mainClient;
-  private final SynchronousQueue<ConsumerAndData> queue;
+  private final LinkedBlockingQueue<ConsumerAndData> queue;
   private final ConcurrentHashMap<InboundDataClient, Object> inboundDataClients;
 
   public QueueingBeamFnDataClient(BeamFnDataClient mainClient) {
     this.mainClient = mainClient;
-    this.queue = new SynchronousQueue<>();
+    this.queue = new LinkedBlockingQueue<ConsumerAndData>(QUEUE_SIZE);
     this.inboundDataClients = new ConcurrentHashMap<>();
   }
 
@@ -71,13 +73,17 @@ public class QueueingBeamFnDataClient implements BeamFnDataClient {
     return inboundDataClient;
   }
 
-  // Returns true if all the InboundDataClients have finished or cancelled.
+  // Returns true if all the InboundDataClients have finished or cancelled and no WindowedValues
+  // remain on the queue.
   private boolean allDone() {
     for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
       if (!inboundDataClient.isDone()) {
         return false;
       }
     }
+    if (!this.queue.isEmpty()) {
+      return false;
+    }
     return true;
   }
 
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
index 32797fa..f786a7c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
@@ -243,7 +243,7 @@ public class QueueingBeamFnDataClientTest {
     }
   }
 
-  @Test(timeout = 10000)
+  @Test(timeout = 100000)
   public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
     CountDownLatch waitForClientToConnect = new CountDownLatch(1);
     // Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
@@ -329,6 +329,11 @@ public class QueueingBeamFnDataClientTest {
 
       // Fail all InboundObservers if any of the downstream consumers fail.
       // This allows the ProcessBundlerHandler to unblock everything and fail properly.
+
+      // Wait for these threads to terminate
+      sendElementsFuture.get();
+      drainElementsFuture.get();
+
       boolean intentionallyFailedA = false;
       try {
         readFutureA.awaitCompletion();
@@ -346,14 +351,9 @@ public class QueueingBeamFnDataClientTest {
         if (e.getCause() instanceof RuntimeException) {
           intentionallyFailedB = true;
         }
-      } catch (Exception e) {
-        intentionallyFailedB = true;
       }
       assertTrue(intentionallyFailedB);
 
-      // Wait for these threads to terminate
-      sendElementsFuture.get();
-      drainElementsFuture.get();
     } finally {
       server.shutdownNow();
     }