You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/23 00:17:23 UTC
[2/4] beam git commit: Move Common Fn Execution Concepts to
fn-execution
Move Common Fn Execution Concepts to fn-execution
Move Stream control packages to fn-execution. Update java package names.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9e2be91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9e2be91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9e2be91
Branch: refs/heads/master
Commit: f9e2be91836fd7fb25ed772a35d9626a7c5a2cc6
Parents: cbb3a73
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 9 18:31:26 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 22 16:16:41 2017 -0800
----------------------------------------------------------------------
runners/java-fn-execution/pom.xml | 29 ---
.../beam/runners/fnexecution/ServerFactory.java | 2 +-
.../runners/fnexecution/ServerFactoryTest.java | 6 +-
sdks/java/fn-execution/build.gradle | 10 +-
sdks/java/fn-execution/pom.xml | 49 ++--
.../beam/sdk/fn/stream/AdvancingPhaser.java | 36 +++
.../sdk/fn/stream/BufferingStreamObserver.java | 171 +++++++++++++
.../apache/beam/sdk/fn/stream/DataStreams.java | 249 +++++++++++++++++++
.../sdk/fn/stream/DirectStreamObserver.java | 71 ++++++
.../ForwardingClientResponseObserver.java | 68 +++++
.../apache/beam/sdk/fn/stream/package-info.java | 22 ++
.../org/apache/beam/harness/test/Consumer.java | 26 --
.../org/apache/beam/harness/test/Supplier.java | 26 --
.../apache/beam/harness/test/TestExecutors.java | 93 -------
.../beam/harness/test/TestExecutorsTest.java | 175 -------------
.../apache/beam/harness/test/TestStreams.java | 185 --------------
.../beam/harness/test/TestStreamsTest.java | 109 --------
.../beam/sdk/fn/stream/AdvancingPhaserTest.java | 53 ++++
.../fn/stream/BufferingStreamObserverTest.java | 155 ++++++++++++
.../beam/sdk/fn/stream/DataStreamsTest.java | 167 +++++++++++++
.../sdk/fn/stream/DirectStreamObserverTest.java | 145 +++++++++++
.../ForwardingClientResponseObserverTest.java | 60 +++++
.../org/apache/beam/sdk/fn/test/Consumer.java | 26 ++
.../org/apache/beam/sdk/fn/test/Supplier.java | 26 ++
.../apache/beam/sdk/fn/test/TestExecutors.java | 93 +++++++
.../beam/sdk/fn/test/TestExecutorsTest.java | 175 +++++++++++++
.../apache/beam/sdk/fn/test/TestStreams.java | 185 ++++++++++++++
.../beam/sdk/fn/test/TestStreamsTest.java | 109 ++++++++
.../org/apache/beam/fn/harness/FnHarness.java | 2 +-
.../beam/fn/harness/state/BagUserState.java | 2 +-
.../beam/fn/harness/stream/AdvancingPhaser.java | 36 ---
.../harness/stream/BufferingStreamObserver.java | 166 -------------
.../beam/fn/harness/stream/DataStreams.java | 229 -----------------
.../fn/harness/stream/DirectStreamObserver.java | 71 ------
.../ForwardingClientResponseObserver.java | 63 -----
.../harness/stream/StreamObserverFactory.java | 20 +-
.../fn/harness/BeamFnDataReadRunnerTest.java | 4 +-
.../apache/beam/fn/harness/FnHarnessTest.java | 4 +-
.../control/BeamFnControlClientTest.java | 2 +-
.../fn/harness/control/RegisterHandlerTest.java | 4 +-
...BeamFnDataBufferingOutboundObserverTest.java | 2 +-
.../harness/data/BeamFnDataGrpcClientTest.java | 4 +-
.../data/BeamFnDataGrpcMultiplexerTest.java | 2 +-
.../logging/BeamFnLoggingClientTest.java | 2 +-
.../state/BeamFnStateGrpcClientCacheTest.java | 2 +-
.../fn/harness/stream/AdvancingPhaserTest.java | 48 ----
.../stream/BufferingStreamObserverTest.java | 146 -----------
.../beam/fn/harness/stream/DataStreamsTest.java | 167 -------------
.../stream/DirectStreamObserverTest.java | 139 -----------
.../ForwardingClientResponseObserverTest.java | 60 -----
.../stream/StreamObserverFactoryTest.java | 3 +
51 files changed, 1865 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index 6ff08b7..3ebcfd0 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,35 +32,6 @@
<packaging>jar</packaging>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <execution>
- <id>enforce-banned-dependencies</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <rules>
- <bannedDependencies>
- <excludes>
- <exclude>com.google.guava:guava-jdk5</exclude>
- <exclude>com.google.protobuf:protobuf-lite</exclude>
- <exclude>org.apache.beam:beam-sdks-java-core</exclude>
- </excludes>
- </bannedDependencies>
- </rules>
- <fail>true</fail>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 93c787d..bb45d08 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -28,8 +28,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
/**
* A {@link Server gRPC server} factory.
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index e0d7bf9..e9b5fa6 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -38,13 +38,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestStreams;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
import org.junit.Test;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/build.gradle
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/build.gradle b/sdks/java/fn-execution/build.gradle
index 1ffe428..69ec54a 100644
--- a/sdks/java/fn-execution/build.gradle
+++ b/sdks/java/fn-execution/build.gradle
@@ -21,17 +21,10 @@ applyJavaNature()
description = "Apache Beam :: SDKs :: Java :: Fn Execution"
-configurations.all {
- // Fn Execution contains shared utilities for Runners and Harnesses which use
- // the Portability framework. Runner-side interactions must not require a
- // dependency on any particular SDK, so this library must not introduce such an
- // edge.
- exclude group: "org.apache.beam", module: "beam-sdks-java-core"
-}
-
dependencies {
compile library.java.guava
shadow project(path: ":beam-model-parent:beam-model-pipeline", configuration: "shadow")
+ shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: "shadow")
shadow library.java.grpc_core
shadow library.java.grpc_stub
shadow library.java.grpc_netty
@@ -39,6 +32,7 @@ dependencies {
testCompile library.java.junit
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
+ testCompile library.java.mockito_core
}
task packageTests(type: Jar) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index 3bdec38..773873e 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -34,39 +34,6 @@
<packaging>jar</packaging>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <execution>
- <id>enforce-banned-dependencies</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <rules>
- <bannedDependencies>
- <excludes>
- <exclude>com.google.guava:guava-jdk5</exclude>
- <exclude>com.google.protobuf:protobuf-lite</exclude>
- <!-- Fn Execution contains shared utilities for Runners and Harnesses which use
- the Portability framework. Runner-side interactions must not require a
- dependency on any particular SDK, so this library must not introduce such an
- edge. -->
- <exclude>org.apache.beam:beam-sdks-java-core</exclude>
- </excludes>
- </bannedDependencies>
- </rules>
- <fail>true</fail>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -74,6 +41,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
@@ -111,5 +88,11 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
new file mode 100644
index 0000000..c091705
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.concurrent.Phaser;
+
+/**
+ * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates
+ * after the first advancement.
+ */
+public final class AdvancingPhaser extends Phaser {
+ public AdvancingPhaser(int numParties) {
+ super(numParties);
+ }
+
+ @Override
+ protected boolean onAdvance(int phase, int registeredParties) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
new file mode 100644
index 0000000..b541e5f
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing
+ * thread responsible for interacting with the underlying {@link CallStreamObserver}.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers
+ * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
+ * becomes ready.
+ */
+@ThreadSafe
+public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+ private static final Object POISON_PILL = new Object();
+ private final LinkedBlockingDeque<T> queue;
+ private final Phaser phaser;
+ private final CallStreamObserver<T> outboundObserver;
+ private final Future<?> queueDrainer;
+ private final int bufferSize;
+
+ public BufferingStreamObserver(
+ Phaser phaser,
+ CallStreamObserver<T> outboundObserver,
+ ExecutorService executor,
+ int bufferSize) {
+ this.phaser = phaser;
+ this.bufferSize = bufferSize;
+ this.queue = new LinkedBlockingDeque<>(bufferSize);
+ this.outboundObserver = outboundObserver;
+ this.queueDrainer =
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ drainQueue();
+ }
+ });
+ }
+
+ private void drainQueue() {
+ try {
+ while (true) {
+ int currentPhase = phaser.getPhase();
+ while (outboundObserver.isReady()) {
+ T value = queue.take();
+ if (value != POISON_PILL) {
+ outboundObserver.onNext(value);
+ } else {
+ return;
+ }
+ }
+ phaser.awaitAdvance(currentPhase);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void onNext(T value) {
+ try {
+ // Attempt to add an element to the bounded queue occasionally checking to see
+ // if the queue drainer is still alive.
+ while (!queue.offer(value, 60, TimeUnit.SECONDS)) {
+ checkState(!queueDrainer.isDone(), "Stream observer has finished.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ synchronized (outboundObserver) {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (!queueDrainer.isDone()) {
+ // We check to see if we were able to successfully insert the poison pill at the front of
+ // the queue to cancel the processing thread eagerly or if the processing thread is done.
+ try {
+ // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+ // since the queue may be full and nothing will be emptying it.
+ while (!queueDrainer.isDone()
+ && !queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)) {}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ waitTillFinish();
+ }
+ outboundObserver.onError(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (outboundObserver) {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (!queueDrainer.isDone()) {
+ // We check to see if we were able to successfully insert the poison pill at the end of
+ // the queue forcing the remainder of the elements to be processed or if the processing
+ // thread is done.
+ try {
+ // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+ // since the queue may be full and nothing will be emptying it.
+ while (!queueDrainer.isDone()
+ && !queue.offerLast((T) POISON_PILL, 60, TimeUnit.SECONDS)) {}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ waitTillFinish();
+ }
+ outboundObserver.onCompleted();
+ }
+ }
+
+ @VisibleForTesting
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ private void waitTillFinish() {
+ try {
+ queueDrainer.get();
+ } catch (CancellationException e) {
+ // Cancellation is expected
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
new file mode 100644
index 0000000..35abc4c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingInputStream;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
+ * {@link #outbound(OutputChunkConsumer)} treats a single {@link OutputStream} as multiple
+ * {@link ByteString}s.
+ */
+// TODO: Migrate logic from BeamFnDataBufferingOutboundObserver to support Outbound
+public class DataStreams {
+ /**
+ * Converts multiple {@link ByteString}s into a single {@link InputStream}.
+ *
+ * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
+ * either it knows that no more values will be provided or it has the next {@link ByteString}.
+ */
+ public static InputStream inbound(Iterator<ByteString> bytes) {
+ return new Inbound(bytes);
+ }
+
+ /**
+ * Converts a single {@link OutputStream} into multiple {@link ByteString ByteStrings}.
+ */
+ public static OutputStream outbound(OutputChunkConsumer<ByteString> consumer) {
+ // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Reads chunks of output.
+ *
+ * @deprecated Used as a temporary placeholder until implementation of
+ * {@link #outbound(OutputChunkConsumer)}.
+ */
+ @Deprecated
+ public interface OutputChunkConsumer<T> {
+ void read(T chunk) throws Exception;
+ }
+
+ /**
+ * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the
+ * first {@link Iterator} on first access of this input stream.
+ *
+ * <p>Closing this input stream has no effect.
+ */
+ private static class Inbound<T> extends InputStream {
+ private static final InputStream EMPTY_STREAM = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+
+ private final Iterator<ByteString> bytes;
+ private InputStream currentStream;
+
+ public Inbound(Iterator<ByteString> bytes) {
+ this.currentStream = EMPTY_STREAM;
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int rval = -1;
+ // Move on to the next stream if we have read nothing
+ while ((rval = currentStream.read()) == -1 && bytes.hasNext()) {
+ currentStream = bytes.next().newInput();
+ }
+ return rval;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int remainingLen = len;
+ while ((remainingLen -= ByteStreams.read(
+ currentStream, b, off + len - remainingLen, remainingLen)) > 0) {
+ if (bytes.hasNext()) {
+ currentStream = bytes.next().newInput();
+ } else {
+ int bytesRead = len - remainingLen;
+ return bytesRead > 0 ? bytesRead : -1;
+ }
+ }
+ return len - remainingLen;
+ }
+ }
+
+ /**
+ * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values
+ * using the specified {@link Coder}.
+ *
+ * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode
+ * consuming zero bytes to consuming exactly one byte.
+ *
+ * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on
+ * first access to {@link #next()} or {@link #hasNext()}.
+ */
+ public static class DataStreamDecoder<T> implements Iterator<T> {
+ private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+
+ private final CountingInputStream countingInputStream;
+ private final PushbackInputStream pushbackInputStream;
+ private final Coder<T> coder;
+ private State currentState;
+ private T next;
+ public DataStreamDecoder(Coder<T> coder, InputStream inputStream) {
+ this.currentState = State.READ_REQUIRED;
+ this.coder = coder;
+ this.pushbackInputStream = new PushbackInputStream(inputStream, 1);
+ this.countingInputStream = new CountingInputStream(pushbackInputStream);
+ }
+
+ @Override
+ public boolean hasNext() {
+ switch (currentState) {
+ case EOF:
+ return false;
+ case READ_REQUIRED:
+ try {
+ int nextByte = pushbackInputStream.read();
+ if (nextByte == -1) {
+ currentState = State.EOF;
+ return false;
+ }
+
+ pushbackInputStream.unread(nextByte);
+ long count = countingInputStream.getCount();
+ next = coder.decode(countingInputStream);
+ // Skip one byte if decoding the value consumed 0 bytes.
+ if (countingInputStream.getCount() - count == 0) {
+ checkState(countingInputStream.read() != -1, "Unexpected EOF reached");
+ }
+ currentState = State.HAS_NEXT;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return true;
+ case HAS_NEXT:
+ return true;
+ }
+ throw new IllegalStateException(String.format("Unknown state %s", currentState));
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ currentState = State.READ_REQUIRED;
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Allows for one or more writing threads to append values to this iterator while one reading
+ * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
+ * available or this has been closed.
+ *
+ * <p>External synchronization must be provided if multiple readers would like to access the
+ * {@link Iterator#hasNext()} and {@link Iterator#next()} methods.
+ *
+ * <p>The order or values which are appended to this iterator is nondeterministic when multiple
+ * threads call {@link #accept(Object)}.
+ */
+ public static class BlockingQueueIterator<T>
+ implements AutoCloseable, Iterator<T> {
+ private static final Object POISION_PILL = new Object();
+ private final BlockingQueue<T> queue;
+
+ /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */
+ private T currentElement;
+
+ public BlockingQueueIterator(BlockingQueue<T> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void close() throws Exception {
+ queue.put((T) POISION_PILL);
+ }
+
+ public void accept(T t) throws Exception {
+ queue.put(t);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentElement == null) {
+ try {
+ currentElement = queue.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ return currentElement != POISION_PILL;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ T rval = currentElement;
+ currentElement = null;
+ return rval;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
new file mode 100644
index 0000000..eb7183f
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.Phaser;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A {@link StreamObserver} which uses synchronization on the underlying
+ * {@link CallStreamObserver} to provide thread safety.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready.
+ * Creator is expected to advance the {@link Phaser} whenever the underlying
+ * {@link CallStreamObserver} becomes ready.
+ */
+@ThreadSafe
+public final class DirectStreamObserver<T> implements StreamObserver<T> {
+ private final Phaser phaser;
+ private final CallStreamObserver<T> outboundObserver;
+
+ public DirectStreamObserver(
+ Phaser phaser,
+ CallStreamObserver<T> outboundObserver) {
+ this.phaser = phaser;
+ this.outboundObserver = outboundObserver;
+ }
+
+ @Override
+ public void onNext(T value) {
+ int phase = phaser.getPhase();
+ if (!outboundObserver.isReady()) {
+ phaser.awaitAdvance(phase);
+ }
+ synchronized (outboundObserver) {
+ outboundObserver.onNext(value);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ synchronized (outboundObserver) {
+ outboundObserver.onError(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (outboundObserver) {
+ outboundObserver.onCompleted();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
new file mode 100644
index 0000000..958c69b
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls.
+ *
+ * <p>Used to wrap existing {@link StreamObserver}s to be able to install an
+ * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
+ *
+ * <p>This is as thread-safe as the underlying stream observer that is being wrapped.
+ */
+public final class ForwardingClientResponseObserver<ReqT, RespT>
+ implements ClientResponseObserver<RespT, ReqT> {
+ public static <ReqT, RespT> ForwardingClientResponseObserver<ReqT, RespT> create(
+ StreamObserver<ReqT> inbound, Runnable onReadyHandler) {
+ return new ForwardingClientResponseObserver<>(inbound, onReadyHandler);
+ }
+
+ private final Runnable onReadyHandler;
+ private final StreamObserver<ReqT> inboundObserver;
+
+ ForwardingClientResponseObserver(
+ StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) {
+ this.inboundObserver = inboundObserver;
+ this.onReadyHandler = onReadyHandler;
+ }
+
+ @Override
+ public void onNext(ReqT value) {
+ inboundObserver.onNext(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserver.onCompleted();
+ }
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<RespT> stream) {
+ stream.setOnReadyHandler(onReadyHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
new file mode 100644
index 0000000..6aa2729
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC stream management.
+ */
+package org.apache.beam.sdk.fn.stream;
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
deleted file mode 100644
index 279fc29..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-/**
- * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
- */
-public interface Consumer<T> {
- void accept(T item);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
deleted file mode 100644
index 629afc2..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-/**
- * A fork of the Java 8 Supplier interface, to enable migrations.
- */
-public interface Supplier<T> {
- T get();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
deleted file mode 100644
index ca12d5a..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
- * allows for testing that tasks have exercised the appropriate shutdown logic.
- */
-public class TestExecutors {
- public static TestExecutorService from(final ExecutorService staticExecutorService) {
- return from(new Supplier<ExecutorService>() {
- @Override
- public ExecutorService get() {
- return staticExecutorService;
- }
- });
- }
-
- public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
- return new FromSupplier(executorServiceSuppler);
- }
-
- /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
- public interface TestExecutorService extends ExecutorService, TestRule {}
-
- private static class FromSupplier extends ForwardingExecutorService
- implements TestExecutorService {
- private final Supplier<ExecutorService> executorServiceSupplier;
- private ExecutorService delegate;
-
- private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
- this.executorServiceSupplier = executorServiceSupplier;
- }
-
- @Override
- public Statement apply(final Statement statement, Description arg1) {
- return new Statement() {
- @Override
- public void evaluate() throws Throwable {
- Throwable thrown = null;
- delegate = executorServiceSupplier.get();
- try {
- statement.evaluate();
- } catch (Throwable t) {
- thrown = t;
- }
- shutdown();
- if (!awaitTermination(5, TimeUnit.SECONDS)) {
- shutdownNow();
- IllegalStateException e =
- new IllegalStateException("Test executor failed to shutdown cleanly.");
- if (thrown != null) {
- thrown.addSuppressed(e);
- } else {
- thrown = e;
- }
- }
- if (thrown != null) {
- throw thrown;
- }
- }
- };
- }
-
- @Override
- protected ExecutorService delegate() {
- return delegate;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
deleted file mode 100644
index f0c98e0..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.model.Statement;
-
-/** Tests for {@link TestExecutors}. */
-@RunWith(JUnit4.class)
-public class TestExecutorsTest {
- @Test
- public void testSuccessfulTermination() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(service);
- final AtomicBoolean taskRan = new AtomicBoolean();
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(new Runnable() {
- @Override
- public void run() {
- taskRan.set(true);
- }
- });
- }
- },
- null)
- .evaluate();
- assertTrue(service.isTerminated());
- assertTrue(taskRan.get());
- }
-
- @Test
- public void testTaskBlocksForeverCausesFailure() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(service);
- final AtomicBoolean taskStarted = new AtomicBoolean();
- final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(new Runnable() {
- @Override
- public void run() {
- taskToRun();
- }
- });
- }
-
- private void taskToRun() {
- taskStarted.set(true);
- try {
- while (true) {
- Thread.sleep(10000);
- }
- } catch (InterruptedException e) {
- taskWasInterrupted.set(true);
- return;
- }
- }
- },
- null)
- .evaluate();
- fail();
- } catch (IllegalStateException e) {
- assertEquals(IllegalStateException.class, e.getClass());
- assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
- }
- assertTrue(service.isShutdown());
- }
-
- @Test
- public void testStatementFailurePropagatedCleanly() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(service);
- final RuntimeException exceptionToThrow = new RuntimeException();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- throw exceptionToThrow;
- }
- },
- null)
- .evaluate();
- fail();
- } catch (RuntimeException thrownException) {
- assertSame(exceptionToThrow, thrownException);
- }
- assertTrue(service.isShutdown());
- }
-
- @Test
- public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
- throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(service);
- final AtomicBoolean taskStarted = new AtomicBoolean();
- final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
- final RuntimeException exceptionToThrow = new RuntimeException();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(new Runnable() {
- @Override
- public void run() {
- taskToRun();
- }
- });
- throw exceptionToThrow;
- }
-
- private void taskToRun() {
- taskStarted.set(true);
- try {
- while (true) {
- Thread.sleep(10000);
- }
- } catch (InterruptedException e) {
- taskWasInterrupted.set(true);
- return;
- }
- }
- },
- null)
- .evaluate();
- fail();
- } catch (RuntimeException thrownException) {
- assertSame(exceptionToThrow, thrownException);
- assertEquals(1, exceptionToThrow.getSuppressed().length);
- assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
- assertEquals(
- "Test executor failed to shutdown cleanly.",
- exceptionToThrow.getSuppressed()[0].getMessage());
- }
- assertTrue(service.isShutdown());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
deleted file mode 100644
index 3df743a..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-
-/** Utility methods which enable testing of {@link StreamObserver}s. */
-public class TestStreams {
- /**
- * Creates a test {@link CallStreamObserver} {@link Builder} that forwards
- * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
- */
- public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- onNext,
- TestStreams.<Throwable>noopConsumer(),
- TestStreams.noopRunnable(),
- TestStreams.alwaysTrueSupplier()));
- }
-
- /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
- public static class Builder<T> {
- private final ForwardingCallStreamObserver<T> observer;
- private Builder(ForwardingCallStreamObserver<T> observer) {
- this.observer = observer;
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link CallStreamObserver#isReady} callback.
- */
- public Builder<T> withIsReady(Supplier<Boolean> isReady) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- observer.onError,
- observer.onCompleted,
- isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onCompleted} callback.
- */
- public Builder<T> withOnCompleted(Runnable onCompleted) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- observer.onError,
- onCompleted,
- observer.isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onError} callback.
- */
- public Builder<T> withOnError(final Runnable onError) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- new Consumer<Throwable>() {
- @Override
- public void accept(Throwable t) {
- onError.run();
- }
- },
- observer.onCompleted,
- observer.isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onError} consumer.
- */
- public Builder<T> withOnError(Consumer<Throwable> onError) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext, onError, observer.onCompleted, observer.isReady));
- }
-
- public CallStreamObserver<T> build() {
- return observer;
- }
- }
-
- private static void noop() {
- }
-
- private static Runnable noopRunnable() {
- return new Runnable() {
- @Override
- public void run() {
- }
- };
- }
-
- private static void noop(Throwable t) {
- }
-
- private static <T> Consumer<T> noopConsumer() {
- return new Consumer<T>() {
- @Override
- public void accept(T item) {
- }
- };
- }
-
- private static boolean returnTrue() {
- return true;
- }
-
- private static Supplier<Boolean> alwaysTrueSupplier() {
- return new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return true;
- }
- };
- }
-
- /** A {@link CallStreamObserver} which executes the supplied callbacks. */
- private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
- private final Consumer<T> onNext;
- private final Supplier<Boolean> isReady;
- private final Consumer<Throwable> onError;
- private final Runnable onCompleted;
-
- public ForwardingCallStreamObserver(
- Consumer<T> onNext,
- Consumer<Throwable> onError,
- Runnable onCompleted,
- Supplier<Boolean> isReady) {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- this.isReady = isReady;
- }
-
- @Override
- public void onNext(T value) {
- onNext.accept(value);
- }
-
- @Override
- public void onError(Throwable t) {
- onError.accept(t);
- }
-
- @Override
- public void onCompleted() {
- onCompleted.run();
- }
-
- @Override
- public boolean isReady() {
- return isReady.get();
- }
-
- @Override
- public void setOnReadyHandler(Runnable onReadyHandler) {}
-
- @Override
- public void disableAutoInboundFlowControl() {}
-
- @Override
- public void request(int count) {}
-
- @Override
- public void setMessageCompression(boolean enable) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
deleted file mode 100644
index c578397..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TestStreams}. */
-@RunWith(JUnit4.class)
-public class TestStreamsTest {
- @Test
- public void testOnNextIsCalled() {
- final AtomicBoolean onNextWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(new Consumer<Boolean>() {
- @Override
- public void accept(Boolean item) {
- onNextWasCalled.set(item);
- }
- }).build().onNext(true);
- assertTrue(onNextWasCalled.get());
- }
-
- @Test
- public void testIsReadyIsCalled() {
- final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
- assertFalse(TestStreams.withOnNext(null)
- .withIsReady(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return isReadyWasCalled.getAndSet(true);
- }
- })
- .build()
- .isReady());
- assertTrue(isReadyWasCalled.get());
- }
-
- @Test
- public void testOnCompletedIsCalled() {
- final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(null)
- .withOnCompleted(new Runnable() {
- @Override
- public void run() {
- onCompletedWasCalled.set(true);
- }
- })
- .build()
- .onCompleted();
- assertTrue(onCompletedWasCalled.get());
- }
-
- @Test
- public void testOnErrorRunnableIsCalled() {
- RuntimeException throwable = new RuntimeException();
- final AtomicBoolean onErrorWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(null)
- .withOnError(new Runnable() {
- @Override
- public void run() {
- onErrorWasCalled.set(true);
- }
- })
- .build()
- .onError(throwable);
- assertTrue(onErrorWasCalled.get());
- }
-
- @Test
- public void testOnErrorConsumerIsCalled() {
- RuntimeException throwable = new RuntimeException();
- final Collection<Throwable> onErrorWasCalled = new ArrayList<>();
- TestStreams.withOnNext(null)
- .withOnError(new Consumer<Throwable>() {
- @Override
- public void accept(Throwable item) {
- onErrorWasCalled.add(item);
- }
- })
- .build()
- .onError(throwable);
- assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
new file mode 100644
index 0000000..3248ab2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link AdvancingPhaser}. */
+@RunWith(JUnit4.class)
+public class AdvancingPhaserTest {
+ @Test
+ public void testAdvancement() throws Exception {
+ final AdvancingPhaser phaser = new AdvancingPhaser(1);
+ int currentPhase = phaser.getPhase();
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(new Runnable() {
+ @Override
+ public void run() {
+ phaser.arrive();
+ }
+ });
+ phaser.awaitAdvance(currentPhase);
+ assertFalse(phaser.isTerminated());
+ service.shutdown();
+ if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
+ assertThat(service.shutdownNow(), empty());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
new file mode 100644
index 0000000..54d02b8
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.Supplier;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BufferingStreamObserver}. */
+@RunWith(JUnit4.class)
+public class BufferingStreamObserverTest {
+ @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());
+
+ @Test
+ public void testThreadSafety() throws Exception {
+ final List<String> onNextValues = new ArrayList<>();
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+ final BufferingStreamObserver<String> streamObserver =
+ new BufferingStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ // Use the atomic boolean to detect if multiple threads are in this
+ // critical section. Any thread that enters purposefully blocks by sleeping
+ // to increase the contention between threads artificially.
+ assertFalse(isCriticalSectionShared.getAndSet(true));
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+ onNextValues.add(t);
+ assertTrue(isCriticalSectionShared.getAndSet(false));
+ }
+ }).build(),
+ executor,
+ 3);
+
+ List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+ List<Callable<String>> tasks = new ArrayList<>();
+ for (final String prefix : prefixes) {
+ tasks.add(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ });
+ }
+ List<Future<String>> results = executor.invokeAll(tasks);
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+
+ // Check that order was maintained.
+ int[] prefixesIndex = new int[prefixes.size()];
+ assertEquals(50, onNextValues.size());
+ for (String onNextValue : onNextValues) {
+ int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+ int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+ assertEquals(prefixesIndex[prefix], suffix);
+ prefixesIndex[prefix] += 1;
+ }
+ }
+
+ @Test
+ public void testIsReadyIsHonored() throws Exception {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean elementsAllowed = new AtomicBoolean();
+ final BufferingStreamObserver<String> streamObserver =
+ new BufferingStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ assertTrue(elementsAllowed.get());
+ }
+ })
+ .withIsReady(
+ new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return elementsAllowed.get();
+ }
+ })
+ .build(),
+ executor,
+ 3);
+
+ // Start all the tasks
+ List<Future<String>> results = new ArrayList<>();
+ for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+ results.add(
+ executor.submit(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ }));
+ }
+
+ // Have them wait and then flip that we do allow elements and wake up those awaiting
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ elementsAllowed.set(true);
+ phaser.arrive();
+
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
new file mode 100644
index 0000000..852b3d0
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.stream.DataStreams.BlockingQueueIterator;
+import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataStreams}. */
+@RunWith(Enclosed.class)
+public class DataStreamsTest {
+
+ /** Tests for {@link DataStreams.Inbound}. */
+ @RunWith(JUnit4.class)
+ public static class InboundTest {
+ private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+ private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+ @Test
+ public void testEmptyRead() throws Exception {
+ assertEquals(ByteString.EMPTY, read());
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+ assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+ assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+ }
+
+ private static ByteString read(ByteString... bytes) throws IOException {
+ return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+ }
+ }
+
+ /** Tests for {@link DataStreams.BlockingQueueIterator}. */
+ @RunWith(JUnit4.class)
+ public static class BlockingQueueIteratorTest {
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+ BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new ArrayBlockingQueue<String>(3));
+
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
+
+ assertEquals(Arrays.asList("A", "B"),
+ Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+
+ @Test(timeout = 10_000)
+ public void testBlockingQueueIteratorWithBlocking() throws Exception {
+ // The synchronous queue only allows for one element to transfer at a time and blocks
+ // the sending/receiving parties until both parties are there.
+ final BlockingQueueIterator<String> iterator =
+ new BlockingQueueIterator<>(new SynchronousQueue<String>());
+ final SettableFuture<List<String>> valuesFuture = SettableFuture.create();
+ Thread appender = new Thread() {
+ @Override
+ public void run() {
+ valuesFuture.set(Arrays.asList(Iterators.toArray(iterator, String.class)));
+ }
+ };
+ appender.start();
+ iterator.accept("A");
+ iterator.accept("B");
+ iterator.close();
+ assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+ appender.join();
+ }
+ }
+
+ /** Tests for {@link DataStreams.DataStreamDecoder}. */
+ @RunWith(JUnit4.class)
+ public static class DataStreamDecoderTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testEmptyInputStream() throws Exception {
+ testDecoderWith(StringUtf8Coder.of());
+ }
+
+ @Test
+ public void testNonEmptyInputStream() throws Exception {
+ testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
+ }
+
+ @Test
+ public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
+ CountingOutputStream countingOutputStream =
+ new CountingOutputStream(ByteStreams.nullOutputStream());
+ GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
+ assumeTrue(countingOutputStream.getCount() == 0);
+
+ testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+ }
+
+ private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (T value : expected) {
+ int size = baos.size();
+ coder.encode(value, baos);
+ // Pad an arbitrary byte when values encode to zero bytes
+ if (baos.size() - size == 0) {
+ baos.write(0);
+ }
+ }
+
+ Iterator<T> decoder =
+ new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray()));
+
+ Object[] actual = Iterators.toArray(decoder, Object.class);
+ assertArrayEquals(expected, actual);
+
+ assertFalse(decoder.hasNext());
+ assertFalse(decoder.hasNext());
+
+ thrown.expect(NoSuchElementException.class);
+ decoder.next();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
new file mode 100644
index 0000000..d59dfbc
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.Supplier;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DirectStreamObserver}. */
+@RunWith(JUnit4.class)
+public class DirectStreamObserverTest {
+ @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());
+
+ @Test
+ public void testThreadSafety() throws Exception {
+ final List<String> onNextValues = new ArrayList<>();
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+ final DirectStreamObserver<String> streamObserver =
+ new DirectStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ // Use the atomic boolean to detect if multiple threads are in this
+ // critical section. Any thread that enters purposefully blocks by sleeping
+ // to increase the contention between threads artificially.
+ assertFalse(isCriticalSectionShared.getAndSet(true));
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ onNextValues.add(t);
+ assertTrue(isCriticalSectionShared.getAndSet(false));
+ }
+ }).build());
+
+ List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+ List<Callable<String>> tasks = new ArrayList<>();
+ for (final String prefix : prefixes) {
+ tasks.add(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ });
+ }
+ executor.invokeAll(tasks);
+ streamObserver.onCompleted();
+
+ // Check that order was maintained.
+ int[] prefixesIndex = new int[prefixes.size()];
+ assertEquals(50, onNextValues.size());
+ for (String onNextValue : onNextValues) {
+ int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+ int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+ assertEquals(prefixesIndex[prefix], suffix);
+ prefixesIndex[prefix] += 1;
+ }
+ }
+
+ @Test
+ public void testIsReadyIsHonored() throws Exception {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean elementsAllowed = new AtomicBoolean();
+ final DirectStreamObserver<String> streamObserver =
+ new DirectStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ assertTrue(elementsAllowed.get());
+ }
+ }).withIsReady(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return elementsAllowed.get();
+ }
+ }).build());
+
+ // Start all the tasks
+ List<Future<String>> results = new ArrayList<>();
+ for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+ results.add(
+ executor.submit(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ }));
+ }
+
+ // Have them wait and then flip that we do allow elements and wake up those awaiting
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ elementsAllowed.set(true);
+ phaser.arrive();
+
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
new file mode 100644
index 0000000..a71841d
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ForwardingClientResponseObserver}. */
+@RunWith(JUnit4.class)
+public class ForwardingClientResponseObserverTest {
+ @Test
+ public void testCallsAreForwardedAndOnReadyHandlerBound() {
+ @SuppressWarnings("unchecked")
+ StreamObserver<Object> delegateObserver = mock(StreamObserver.class);
+ @SuppressWarnings("unchecked")
+ ClientCallStreamObserver<Object> callStreamObserver =
+ mock(ClientCallStreamObserver.class);
+ Runnable onReadyHandler = new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+ ClientResponseObserver<Object, Object> observer =
+ new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler);
+ observer.onNext("A");
+ verify(delegateObserver).onNext("A");
+ Throwable t = new RuntimeException();
+ observer.onError(t);
+ verify(delegateObserver).onError(t);
+ observer.onCompleted();
+ verify(delegateObserver).onCompleted();
+ observer.beforeStart(callStreamObserver);
+ verify(callStreamObserver).setOnReadyHandler(onReadyHandler);
+ verifyNoMoreInteractions(delegateObserver, callStreamObserver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
new file mode 100644
index 0000000..184a6e2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+/**
+ * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
+ */
+public interface Consumer<T> {
+ void accept(T item);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
new file mode 100644
index 0000000..b0bae2e
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+/**
+ * A fork of the Java 8 Supplier interface, to enable migrations.
+ */
+public interface Supplier<T> {
+ T get();
+}