You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/03/05 18:13:42 UTC
[beam] branch master updated: [BEAM-6612] Improve java SDK
performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient.
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ccdd32 [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient.
new 1107fc1 Merge pull request #7895: [BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue in QueueingBeamFnDataClient
5ccdd32 is described below
commit 5ccdd322975b6d63197b976a60f0f6a1c34c181d
Author: Alex Amato <aj...@google.com>
AuthorDate: Tue Feb 19 11:25:04 2019 -0800
[BEAM-6612] Improve java SDK performance by using a LinkedBlockingQueue
in QueueingBeamFnDataClient.
---
.../sdk/fn/data/CompletableFutureInboundDataClient.java | 4 +++-
.../beam/fn/harness/data/QueueingBeamFnDataClient.java | 14 ++++++++++----
.../beam/fn/harness/data/QueueingBeamFnDataClientTest.java | 12 ++++++------
3 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
index 463a168..5f02e6e 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
@@ -65,6 +65,8 @@ public class CompletableFutureInboundDataClient implements InboundDataClient {
@Override
public void fail(Throwable t) {
- future.completeExceptionally(t);
+ // Use obtrudeException instead of CompleteExceptionally, forcing any future calls to .get()
+ // to raise the execption, even if the future is already compelted.
+ future.obtrudeException(t);
}
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
index 194672d..ebcb903 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
@@ -18,7 +18,7 @@
package org.apache.beam.fn.harness.data;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -39,15 +39,17 @@ import org.slf4j.LoggerFactory;
*/
public class QueueingBeamFnDataClient implements BeamFnDataClient {
+ private static final int QUEUE_SIZE = 1000;
+
private static final Logger LOG = LoggerFactory.getLogger(QueueingBeamFnDataClient.class);
private final BeamFnDataClient mainClient;
- private final SynchronousQueue<ConsumerAndData> queue;
+ private final LinkedBlockingQueue<ConsumerAndData> queue;
private final ConcurrentHashMap<InboundDataClient, Object> inboundDataClients;
public QueueingBeamFnDataClient(BeamFnDataClient mainClient) {
this.mainClient = mainClient;
- this.queue = new SynchronousQueue<>();
+ this.queue = new LinkedBlockingQueue<ConsumerAndData>(QUEUE_SIZE);
this.inboundDataClients = new ConcurrentHashMap<>();
}
@@ -71,13 +73,17 @@ public class QueueingBeamFnDataClient implements BeamFnDataClient {
return inboundDataClient;
}
- // Returns true if all the InboundDataClients have finished or cancelled.
+ // Returns true if all the InboundDataClients have finished or cancelled and no WindowedValues
+ // remain on the queue.
private boolean allDone() {
for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
if (!inboundDataClient.isDone()) {
return false;
}
}
+ if (!this.queue.isEmpty()) {
+ return false;
+ }
return true;
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
index 32797fa..f786a7c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
@@ -243,7 +243,7 @@ public class QueueingBeamFnDataClientTest {
}
}
- @Test(timeout = 10000)
+ @Test(timeout = 100000)
public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
CountDownLatch waitForClientToConnect = new CountDownLatch(1);
// Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
@@ -329,6 +329,11 @@ public class QueueingBeamFnDataClientTest {
// Fail all InboundObservers if any of the downstream consumers fail.
// This allows the ProcessBundlerHandler to unblock everything and fail properly.
+
+ // Wait for these threads to terminate
+ sendElementsFuture.get();
+ drainElementsFuture.get();
+
boolean intentionallyFailedA = false;
try {
readFutureA.awaitCompletion();
@@ -346,14 +351,9 @@ public class QueueingBeamFnDataClientTest {
if (e.getCause() instanceof RuntimeException) {
intentionallyFailedB = true;
}
- } catch (Exception e) {
- intentionallyFailedB = true;
}
assertTrue(intentionallyFailedB);
- // Wait for these threads to terminate
- sendElementsFuture.get();
- drainElementsFuture.get();
} finally {
server.shutdownNow();
}