You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:53 UTC
[2/6] beam git commit: A proposal for a portability framework to
execute user definable functions.
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
new file mode 100644
index 0000000..20566ea
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataGrpcClient}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataGrpcClientTest {
+ private static final Coder<WindowedValue<String>> CODER =
+ LengthPrefixCoder.of(
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
+ GlobalWindow.Coder.INSTANCE));
+ private static final KV<Long, BeamFnApi.Target> KEY_A = KV.of(
+ 12L,
+ BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(34L).setName("targetA").build());
+ private static final KV<Long, BeamFnApi.Target> KEY_B = KV.of(
+ 56L,
+ BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(78L).setName("targetB").build());
+
+ private static final BeamFnApi.Elements ELEMENTS_A_1;
+ private static final BeamFnApi.Elements ELEMENTS_A_2;
+ private static final BeamFnApi.Elements ELEMENTS_B_1;
+ static {
+ try {
+ ELEMENTS_A_1 = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(KEY_A.getKey())
+ .setTarget(KEY_A.getValue())
+ .setData(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("ABC")))
+ .concat(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("DEF"))))))
+ .build();
+ ELEMENTS_A_2 = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(KEY_A.getKey())
+ .setTarget(KEY_A.getValue())
+ .setData(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("GHI")))))
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(KEY_A.getKey())
+ .setTarget(KEY_A.getValue()))
+ .build();
+ ELEMENTS_B_1 = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(KEY_B.getKey())
+ .setTarget(KEY_B.getValue())
+ .setData(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("JKL")))
+ .concat(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("MNO"))))))
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(KEY_B.getKey())
+ .setTarget(KEY_B.getValue()))
+ .build();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ @Test
+ public void testForInboundConsumer() throws Exception {
+ CountDownLatch waitForClientToConnect = new CountDownLatch(1);
+ Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
+ Collection<WindowedValue<String>> inboundValuesB = new ConcurrentLinkedQueue<>();
+ Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+ TestStreams.withOnNext(inboundServerValues::add).build();
+
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
+ BeamFnApi.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnDataGrpc.BeamFnDataImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.Elements> data(
+ StreamObserver<BeamFnApi.Elements> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ waitForClientToConnect.countDown();
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+ try {
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+ BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
+ PipelineOptionsFactory.create(),
+ (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+ this::createStreamForTest);
+
+ CompletableFuture<Void> readFutureA = clientFactory.forInboundConsumer(
+ apiServiceDescriptor,
+ KEY_A,
+ CODER,
+ inboundValuesA::add);
+
+ waitForClientToConnect.await();
+ outboundServerObserver.get().onNext(ELEMENTS_A_1);
+ // Purposefully transmit some data before the consumer for B is bound showing that
+ // data is not lost
+ outboundServerObserver.get().onNext(ELEMENTS_B_1);
+ Thread.sleep(100);
+
+ CompletableFuture<Void> readFutureB = clientFactory.forInboundConsumer(
+ apiServiceDescriptor,
+ KEY_B,
+ CODER,
+ inboundValuesB::add);
+
+ // Show that out of order stream completion can occur.
+ readFutureB.get();
+ assertThat(inboundValuesB, contains(
+ valueInGlobalWindow("JKL"), valueInGlobalWindow("MNO")));
+
+ outboundServerObserver.get().onNext(ELEMENTS_A_2);
+ readFutureA.get();
+ assertThat(inboundValuesA, contains(
+ valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testForInboundConsumerThatThrows() throws Exception {
+ CountDownLatch waitForClientToConnect = new CountDownLatch(1);
+ AtomicInteger consumerInvoked = new AtomicInteger();
+ Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+ TestStreams.withOnNext(inboundServerValues::add).build();
+
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
+ BeamFnApi.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnDataGrpc.BeamFnDataImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.Elements> data(
+ StreamObserver<BeamFnApi.Elements> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ waitForClientToConnect.countDown();
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+ RuntimeException exceptionToThrow = new RuntimeException("TestFailure");
+ try {
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+ BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
+ PipelineOptionsFactory.create(),
+ (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+ this::createStreamForTest);
+
+ CompletableFuture<Void> readFuture = clientFactory.forInboundConsumer(
+ apiServiceDescriptor,
+ KEY_A,
+ CODER,
+ new ThrowingConsumer<WindowedValue<String>>() {
+ @Override
+ public void accept(WindowedValue<String> t) throws Exception {
+ consumerInvoked.incrementAndGet();
+ throw exceptionToThrow;
+ }
+ });
+
+ waitForClientToConnect.await();
+
+ // This first message should cause a failure afterwards all other messages are dropped.
+ outboundServerObserver.get().onNext(ELEMENTS_A_1);
+ outboundServerObserver.get().onNext(ELEMENTS_A_2);
+
+ try {
+ readFuture.get();
+ fail("Expected channel to fail");
+ } catch (ExecutionException e) {
+ assertEquals(exceptionToThrow, e.getCause());
+ }
+ // The server should not have received any values
+ assertThat(inboundServerValues, empty());
+ // The consumer should have only been invoked once
+ assertEquals(1, consumerInvoked.get());
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testForOutboundConsumer() throws Exception {
+ CountDownLatch waitForInboundServerValuesCompletion = new CountDownLatch(2);
+ Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
+ CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+ TestStreams.withOnNext(
+ new Consumer<BeamFnApi.Elements>() {
+ @Override
+ public void accept(BeamFnApi.Elements t) {
+ inboundServerValues.add(t);
+ waitForInboundServerValuesCompletion.countDown();
+ }
+ }
+ ).build();
+
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
+ BeamFnApi.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnDataGrpc.BeamFnDataImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.Elements> data(
+ StreamObserver<BeamFnApi.Elements> outboundObserver) {
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+ try {
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+ BeamFnDataGrpcClient clientFactory = new BeamFnDataGrpcClient(
+ PipelineOptionsFactory.fromArgs(
+ new String[]{ "--experiments=beam_fn_api_data_buffer_limit=20" }).create(),
+ (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+ this::createStreamForTest);
+
+ try (CloseableThrowingConsumer<WindowedValue<String>> consumer =
+ clientFactory.forOutboundConsumer(apiServiceDescriptor, KEY_A, CODER)) {
+ consumer.accept(valueInGlobalWindow("ABC"));
+ consumer.accept(valueInGlobalWindow("DEF"));
+ consumer.accept(valueInGlobalWindow("GHI"));
+ }
+
+ waitForInboundServerValuesCompletion.await();
+
+ assertThat(inboundServerValues, contains(ELEMENTS_A_1, ELEMENTS_A_2));
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
+ Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+ StreamObserver<ReqT> handler) {
+ return clientFactory.apply(handler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
new file mode 100644
index 0000000..38d9e2c
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+
+/** Tests for {@link BeamFnDataGrpcMultiplexer}. */
+public class BeamFnDataGrpcMultiplexerTest {
+ private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR =
+ BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build();
+ private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L,
+ BeamFnApi.Target.newBuilder()
+ .setName("name")
+ .setPrimitiveTransformReference(888L)
+ .build());
+ private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(OUTPUT_LOCATION.getKey())
+ .setTarget(OUTPUT_LOCATION.getValue())
+ .setData(ByteString.copyFrom(new byte[1])))
+ .build();
+ private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder()
+ .addData(BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(OUTPUT_LOCATION.getKey())
+ .setTarget(OUTPUT_LOCATION.getValue()))
+ .build();
+
+ @Test
+ public void testOutboundObserver() {
+ Collection<BeamFnApi.Elements> values = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(
+ DESCRIPTOR,
+ (StreamObserver<BeamFnApi.Elements> inboundObserver)
+ -> TestStreams.withOnNext(values::add).build());
+ multiplexer.getOutboundObserver().onNext(ELEMENTS);
+ assertThat(values, contains(ELEMENTS));
+ }
+
+ @Test
+ public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
+ Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
+ Collection<BeamFnApi.Elements.Data> inboundValues = new ArrayList<>();
+ BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(
+ DESCRIPTOR,
+ (StreamObserver<BeamFnApi.Elements> inboundObserver)
+ -> TestStreams.withOnNext(outboundValues::add).build());
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ // Purposefully sleep to simulate a delay in a consumer connecting.
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ multiplexer.futureForKey(OUTPUT_LOCATION).complete(inboundValues::add);
+ }
+ });
+ multiplexer.getInboundObserver().onNext(ELEMENTS);
+ assertTrue(multiplexer.consumers.containsKey(OUTPUT_LOCATION));
+ // Ensure that when we see a terminal Elements object, we remove the consumer
+ multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
+ assertFalse(multiplexer.consumers.containsKey(OUTPUT_LOCATION));
+
+ // Assert that normal and terminal Elements are passed to the consumer
+ assertThat(inboundValues, contains(ELEMENTS.getData(0), TERMINAL_ELEMENTS.getData(0)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
new file mode 100644
index 0000000..ff0e083
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataInboundObserver}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataInboundObserverTest {
+ private static final Coder<WindowedValue<String>> CODER =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+
+ @Test
+ public void testDecodingElements() throws Exception {
+ Collection<WindowedValue<String>> values = new ArrayList<>();
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ BeamFnDataInboundObserver<String> observer = new BeamFnDataInboundObserver<>(
+ CODER,
+ values::add,
+ readFuture);
+
+ // Test decoding multiple messages
+ observer.accept(dataWith("ABC", "DEF", "GHI"));
+ assertThat(values, contains(
+ valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
+ values.clear();
+
+ // Test empty message signaling end of stream
+ assertFalse(readFuture.isDone());
+ observer.accept(dataWith());
+ assertTrue(readFuture.isDone());
+
+ // Test messages after stream is finished are discarded
+ observer.accept(dataWith("ABC", "DEF", "GHI"));
+ assertThat(values, empty());
+ }
+
+ @Test
+ public void testConsumptionFailureCompletesReadFutureAndDiscardsMessages() throws Exception {
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ BeamFnDataInboundObserver<String> observer = new BeamFnDataInboundObserver<>(
+ CODER,
+ this::throwOnDefValue,
+ readFuture);
+
+ assertFalse(readFuture.isDone());
+ observer.accept(dataWith("ABC", "DEF", "GHI"));
+ assertTrue(readFuture.isCompletedExceptionally());
+
+ try {
+ readFuture.get();
+ fail("Expected failure");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(RuntimeException.class));
+ assertEquals("Failure", e.getCause().getMessage());
+ }
+ }
+
+ private void throwOnDefValue(WindowedValue<String> value) {
+ if ("DEF".equals(value.getValue())) {
+ throw new RuntimeException("Failure");
+ }
+ }
+
+ private BeamFnApi.Elements.Data dataWith(String ... values) throws Exception {
+ BeamFnApi.Elements.Data.Builder builder = BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionReference(777L)
+ .setTarget(BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(999L)
+ .setName("Test"));
+ ByteString.Output output = ByteString.newOutput();
+ for (String value : values) {
+ CODER.encode(valueInGlobalWindow(value), output, Context.NESTED);
+ }
+ builder.setData(output.toByteString());
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
new file mode 100644
index 0000000..bb6a501
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.logging;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
+import java.util.logging.LogRecord;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnLoggingClient}. */
+@RunWith(JUnit4.class)
+public class BeamFnLoggingClientTest {
+
+ private static final LogRecord FILTERED_RECORD;
+ private static final LogRecord TEST_RECORD;
+ private static final LogRecord TEST_RECORD_WITH_EXCEPTION;
+
+ static {
+ FILTERED_RECORD = new LogRecord(Level.SEVERE, "FilteredMessage");
+
+ TEST_RECORD = new LogRecord(Level.FINE, "Message");
+ TEST_RECORD.setLoggerName("LoggerName");
+ TEST_RECORD.setMillis(1234567890L);
+ TEST_RECORD.setThreadID(12345);
+
+ TEST_RECORD_WITH_EXCEPTION = new LogRecord(Level.WARNING, "MessageWithException");
+ TEST_RECORD_WITH_EXCEPTION.setLoggerName("LoggerName");
+ TEST_RECORD_WITH_EXCEPTION.setMillis(1234567890L);
+ TEST_RECORD_WITH_EXCEPTION.setThreadID(12345);
+ TEST_RECORD_WITH_EXCEPTION.setThrown(new RuntimeException("ExceptionMessage"));
+ }
+
+ private static final BeamFnApi.LogEntry TEST_ENTRY =
+ BeamFnApi.LogEntry.newBuilder()
+ .setSeverity(BeamFnApi.LogEntry.Severity.DEBUG)
+ .setMessage("Message")
+ .setThread("12345")
+ .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
+ .setLogLocation("LoggerName")
+ .build();
+ private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION =
+ BeamFnApi.LogEntry.newBuilder()
+ .setSeverity(BeamFnApi.LogEntry.Severity.WARN)
+ .setMessage("MessageWithException")
+ .setTrace(getStackTraceAsString(TEST_RECORD_WITH_EXCEPTION.getThrown()))
+ .setThread("12345")
+ .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
+ .setLogLocation("LoggerName")
+ .build();
+
+ @Test
+ public void testLogging() throws Exception {
+ AtomicBoolean clientClosedStream = new AtomicBoolean();
+ Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList()))
+ .withOnCompleted(new Runnable() {
+ @Override
+ public void run() {
+ // Remember that the client told us that this stream completed
+ clientClosedStream.set(true);
+ outboundServerObserver.get().onCompleted();
+ }
+ }).build();
+
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
+ BeamFnApi.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+ try {
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+ BeamFnLoggingClient client = new BeamFnLoggingClient(
+ PipelineOptionsFactory.fromArgs(new String[] {
+ "--defaultWorkerLogLevel=OFF",
+ "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ }).create(),
+ apiServiceDescriptor,
+ (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+ this::createStreamForTest);
+
+ // Ensure that log levels were correctly set.
+ assertEquals(Level.OFF,
+ LogManager.getLogManager().getLogger("").getLevel());
+ assertEquals(Level.FINE,
+ LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ // Should be filtered because the default log level override is OFF
+ LogManager.getLogManager().getLogger("").log(FILTERED_RECORD);
+ // Should not be filtered because the default log level override for ConfiguredLogger is DEBUG
+ LogManager.getLogManager().getLogger("ConfiguredLogger").log(TEST_RECORD);
+ LogManager.getLogManager().getLogger("ConfiguredLogger").log(TEST_RECORD_WITH_EXCEPTION);
+ client.close();
+
+ // Verify that after close, log levels are reset.
+ assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+ assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ assertTrue(clientClosedStream.get());
+ assertTrue(channel.isShutdown());
+ assertThat(values, contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION));
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
+ Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+ StreamObserver<ReqT> handler) {
+ return clientFactory.apply(handler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
new file mode 100644
index 0000000..3dd1b42
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link AdvancingPhaser}. */
+@RunWith(JUnit4.class)
+public class AdvancingPhaserTest {
+ @Test
+ public void testAdvancement() throws Exception {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ int currentPhase = phaser.getPhase();
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(phaser::arrive);
+ phaser.awaitAdvance(currentPhase);
+ assertFalse(phaser.isTerminated());
+ service.shutdown();
+ if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
+ assertThat(service.shutdownNow(), empty());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
new file mode 100644
index 0000000..76b7ef0
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.test.TestExecutors;
+import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BufferingStreamObserver}. */
+@RunWith(JUnit4.class)
+public class BufferingStreamObserverTest {
+ @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+ @Test
+ public void testThreadSafety() throws Exception {
+ List<String> onNextValues = new ArrayList<>();
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+ final BufferingStreamObserver<String> streamObserver =
+ new BufferingStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ // Use the atomic boolean to detect if multiple threads are in this
+ // critical section. Any thread that enters purposefully blocks by sleeping
+ // to increase the contention between threads artificially.
+ assertFalse(isCriticalSectionShared.getAndSet(true));
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ onNextValues.add(t);
+ assertTrue(isCriticalSectionShared.getAndSet(false));
+ }
+ }).build(),
+ executor,
+ 3);
+
+ List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+ List<Callable<String>> tasks = new ArrayList<>();
+ for (String prefix : prefixes) {
+ tasks.add(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ });
+ }
+ List<Future<String>> results = executor.invokeAll(tasks);
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+
+ // Check that order was maintained.
+ int[] prefixesIndex = new int[prefixes.size()];
+ assertEquals(50, onNextValues.size());
+ for (String onNextValue : onNextValues) {
+ int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+ int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+ assertEquals(prefixesIndex[prefix], suffix);
+ prefixesIndex[prefix] += 1;
+ }
+ }
+
+ @Test
+ public void testIsReadyIsHonored() throws Exception {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean elementsAllowed = new AtomicBoolean();
+ final BufferingStreamObserver<String> streamObserver =
+ new BufferingStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ assertTrue(elementsAllowed.get());
+ }
+ }).withIsReady(elementsAllowed::get).build(),
+ executor,
+ 3);
+
+ // Start all the tasks
+ List<Future<String>> results = new ArrayList<>();
+ for (String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+ results.add(
+ executor.submit(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ }));
+ }
+
+ // Have them wait and then flip that we do allow elements and wake up those awaiting
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ elementsAllowed.set(true);
+ phaser.arrive();
+
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
new file mode 100644
index 0000000..b5d3ec1
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.test.TestExecutors;
+import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DirectStreamObserver}. */
+@RunWith(JUnit4.class)
+public class DirectStreamObserverTest {
+ @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+ @Test
+ public void testThreadSafety() throws Exception {
+ List<String> onNextValues = new ArrayList<>();
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+ final DirectStreamObserver<String> streamObserver =
+ new DirectStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ // Use the atomic boolean to detect if multiple threads are in this
+ // critical section. Any thread that enters purposefully blocks by sleeping
+ // to increase the contention between threads artificially.
+ assertFalse(isCriticalSectionShared.getAndSet(true));
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ onNextValues.add(t);
+ assertTrue(isCriticalSectionShared.getAndSet(false));
+ }
+ }).build());
+
+ List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+ List<Callable<String>> tasks = new ArrayList<>();
+ for (String prefix : prefixes) {
+ tasks.add(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ });
+ }
+ executor.invokeAll(tasks);
+ streamObserver.onCompleted();
+
+ // Check that order was maintained.
+ int[] prefixesIndex = new int[prefixes.size()];
+ assertEquals(50, onNextValues.size());
+ for (String onNextValue : onNextValues) {
+ int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+ int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+ assertEquals(prefixesIndex[prefix], suffix);
+ prefixesIndex[prefix] += 1;
+ }
+ }
+
+ @Test
+ public void testIsReadyIsHonored() throws Exception {
+ AdvancingPhaser phaser = new AdvancingPhaser(1);
+ final AtomicBoolean elementsAllowed = new AtomicBoolean();
+ final DirectStreamObserver<String> streamObserver =
+ new DirectStreamObserver<>(
+ phaser,
+ TestStreams.withOnNext(
+ new Consumer<String>() {
+ @Override
+ public void accept(String t) {
+ assertTrue(elementsAllowed.get());
+ }
+ }).withIsReady(elementsAllowed::get).build());
+
+ // Start all the tasks
+ List<Future<String>> results = new ArrayList<>();
+ for (String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+ results.add(
+ executor.submit(
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ streamObserver.onNext(prefix + i);
+ }
+ return prefix;
+ }
+ }));
+ }
+
+ // Have them wait and then flip that we do allow elements and wake up those awaiting
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ elementsAllowed.set(true);
+ phaser.arrive();
+
+ for (Future<String> result : results) {
+ result.get();
+ }
+ streamObserver.onCompleted();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
new file mode 100644
index 0000000..598d7f3
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests for {@link ForwardingClientResponseObserver}. */
+@RunWith(JUnit4.class)
+public class ForwardingClientResponseObserverTest {
+ @Test
+ public void testCallsAreForwardedAndOnReadyHandlerBound() {
+ @SuppressWarnings("unchecked")
+ StreamObserver<Object> delegateObserver = Mockito.mock(StreamObserver.class);
+ @SuppressWarnings("unchecked")
+ ClientCallStreamObserver<Object> callStreamObserver =
+ Mockito.mock(ClientCallStreamObserver.class);
+ Runnable onReadyHandler = new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+ ClientResponseObserver<Object, Object> observer =
+ new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler);
+ observer.onNext("A");
+ verify(delegateObserver).onNext("A");
+ Throwable t = new RuntimeException();
+ observer.onError(t);
+ verify(delegateObserver).onError(t);
+ observer.onCompleted();
+ verify(delegateObserver).onCompleted();
+ observer.beforeStart(callStreamObserver);
+ verify(callStreamObserver).setOnReadyHandler(onReadyHandler);
+ verifyNoMoreInteractions(delegateObserver, callStreamObserver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
new file mode 100644
index 0000000..9331079
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link StreamObserverFactory}. */
+@RunWith(JUnit4.class)
+public class StreamObserverFactoryTest {
+ @Mock private StreamObserver<Integer> mockRequestObserver;
+ @Mock private CallStreamObserver<String> mockResponseObserver;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testDefaultInstantiation() {
+ StreamObserver<String> observer =
+ StreamObserverFactory.fromOptions(PipelineOptionsFactory.create())
+ .from(this::fakeFactory, mockRequestObserver);
+ assertThat(observer, instanceOf(DirectStreamObserver.class));
+ }
+
+ @Test
+ public void testBufferedStreamInstantiation() {
+ StreamObserver<String> observer =
+ StreamObserverFactory.fromOptions(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {"--experiments=beam_fn_api_buffered_stream"})
+ .create())
+ .from(this::fakeFactory, mockRequestObserver);
+ assertThat(observer, instanceOf(BufferingStreamObserver.class));
+ }
+
+ @Test
+ public void testBufferedStreamWithLimitInstantiation() {
+ StreamObserver<String> observer =
+ StreamObserverFactory.fromOptions(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--experiments=beam_fn_api_buffered_stream,"
+ + "beam_fn_api_buffered_stream_buffer_size=1"
+ })
+ .create())
+ .from(this::fakeFactory, mockRequestObserver);
+ assertThat(observer, instanceOf(BufferingStreamObserver.class));
+ assertEquals(1, ((BufferingStreamObserver<String>) observer).getBufferSize());
+ }
+
+ private StreamObserver<String> fakeFactory(StreamObserver<Integer> observer) {
+ assertThat(observer, instanceOf(ForwardingClientResponseObserver.class));
+ return mockResponseObserver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
new file mode 100644
index 0000000..f846466
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.test;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
+ * allows for testing that tasks have exercised the appropriate shutdown logic.
+ */
+public class TestExecutors {
+ public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
+ return new FromSupplier(executorServiceSuppler);
+ }
+
+ /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
+ public interface TestExecutorService extends ExecutorService, TestRule {}
+
+ private static class FromSupplier extends ForwardingExecutorService
+ implements TestExecutorService {
+ private final Supplier<ExecutorService> executorServiceSupplier;
+ private ExecutorService delegate;
+
+ private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
+ this.executorServiceSupplier = executorServiceSupplier;
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description arg1) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ Throwable thrown = null;
+ delegate = executorServiceSupplier.get();
+ try {
+ statement.evaluate();
+ } catch (Throwable t) {
+ thrown = t;
+ }
+ shutdown();
+ if (!awaitTermination(5, TimeUnit.SECONDS)) {
+ shutdownNow();
+ IllegalStateException e =
+ new IllegalStateException("Test executor failed to shutdown cleanly.");
+ if (thrown != null) {
+ thrown.addSuppressed(e);
+ } else {
+ thrown = e;
+ }
+ }
+ if (thrown != null) {
+ throw thrown;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return delegate;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
new file mode 100644
index 0000000..85c64d0
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/** Tests for {@link TestExecutors}. */
+@RunWith(JUnit4.class)
+public class TestExecutorsTest {
+ @Test
+ public void testSuccessfulTermination() throws Throwable {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final TestExecutorService testService = TestExecutors.from(() -> service);
+ final AtomicBoolean taskRan = new AtomicBoolean();
+ testService
+ .apply(
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ testService.submit(() -> taskRan.set(true));
+ }
+ },
+ null)
+ .evaluate();
+ assertTrue(service.isTerminated());
+ assertTrue(taskRan.get());
+ }
+
+ @Test
+ public void testTaskBlocksForeverCausesFailure() throws Throwable {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final TestExecutorService testService = TestExecutors.from(() -> service);
+ final AtomicBoolean taskStarted = new AtomicBoolean();
+ final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+ try {
+ testService
+ .apply(
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ testService.submit(this::taskToRun);
+ }
+
+ private void taskToRun() {
+ taskStarted.set(true);
+ try {
+ while (true) {
+ Thread.sleep(10000);
+ }
+ } catch (InterruptedException e) {
+ taskWasInterrupted.set(true);
+ return;
+ }
+ }
+ },
+ null)
+ .evaluate();
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(IllegalStateException.class, e.getClass());
+ assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
+ }
+ assertTrue(service.isShutdown());
+ }
+
+ @Test
+ public void testStatementFailurePropagatedCleanly() throws Throwable {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final TestExecutorService testService = TestExecutors.from(() -> service);
+ final RuntimeException exceptionToThrow = new RuntimeException();
+ try {
+ testService
+ .apply(
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ throw exceptionToThrow;
+ }
+ },
+ null)
+ .evaluate();
+ fail();
+ } catch (RuntimeException thrownException) {
+ assertSame(exceptionToThrow, thrownException);
+ }
+ assertTrue(service.isShutdown());
+ }
+
+ @Test
+ public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
+ throws Throwable {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final TestExecutorService testService = TestExecutors.from(() -> service);
+ final AtomicBoolean taskStarted = new AtomicBoolean();
+ final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+ final RuntimeException exceptionToThrow = new RuntimeException();
+ try {
+ testService
+ .apply(
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ testService.submit(this::taskToRun);
+ throw exceptionToThrow;
+ }
+
+ private void taskToRun() {
+ taskStarted.set(true);
+ try {
+ while (true) {
+ Thread.sleep(10000);
+ }
+ } catch (InterruptedException e) {
+ taskWasInterrupted.set(true);
+ return;
+ }
+ }
+ },
+ null)
+ .evaluate();
+ fail();
+ } catch (RuntimeException thrownException) {
+ assertSame(exceptionToThrow, thrownException);
+ assertEquals(1, exceptionToThrow.getSuppressed().length);
+ assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
+ assertEquals(
+ "Test executor failed to shutdown cleanly.",
+ exceptionToThrow.getSuppressed()[0].getMessage());
+ }
+ assertTrue(service.isShutdown());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
new file mode 100644
index 0000000..f398286
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.test;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Utility methods which enable testing of {@link StreamObserver}s. */
+public class TestStreams {
+ /**
+ * Creates a test {@link CallStreamObserver} {@link Builder} that forwards
+ * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
+ */
+ public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
+ return new Builder<>(new ForwardingCallStreamObserver<>(
+ onNext,
+ TestStreams::noop,
+ TestStreams::noop,
+ TestStreams::returnTrue));
+ }
+
+ /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
+ public static class Builder<T> {
+ private final ForwardingCallStreamObserver<T> observer;
+ private Builder(ForwardingCallStreamObserver<T> observer) {
+ this.observer = observer;
+ }
+
+ /**
+ * Returns a new {@link Builder} like this one with the specified
+ * {@link CallStreamObserver#isReady} callback.
+ */
+ public Builder<T> withIsReady(Supplier<Boolean> isReady) {
+ return new Builder<>(new ForwardingCallStreamObserver<>(
+ observer.onNext,
+ observer.onError,
+ observer.onCompleted,
+ isReady));
+ }
+
+ /**
+ * Returns a new {@link Builder} like this one with the specified
+ * {@link StreamObserver#onCompleted} callback.
+ */
+ public Builder<T> withOnCompleted(Runnable onCompleted) {
+ return new Builder<>(new ForwardingCallStreamObserver<>(
+ observer.onNext,
+ observer.onError,
+ onCompleted,
+ observer.isReady));
+ }
+
+ /**
+ * Returns a new {@link Builder} like this one with the specified
+ * {@link StreamObserver#onError} callback.
+ */
+ public Builder<T> withOnError(Runnable onError) {
+ return new Builder<>(new ForwardingCallStreamObserver<>(
+ observer.onNext,
+ new Consumer<Throwable>() {
+ @Override
+ public void accept(Throwable t) {
+ onError.run();
+ }
+ },
+ observer.onCompleted,
+ observer.isReady));
+ }
+
+ /**
+ * Returns a new {@link Builder} like this one with the specified
+ * {@link StreamObserver#onError} consumer.
+ */
+ public Builder<T> withOnError(Consumer<Throwable> onError) {
+ return new Builder<>(new ForwardingCallStreamObserver<>(
+ observer.onNext, onError, observer.onCompleted, observer.isReady));
+ }
+
+ public CallStreamObserver<T> build() {
+ return observer;
+ }
+ }
+
+ private static void noop() {
+ }
+
+ private static void noop(Throwable t) {
+ }
+
+ private static boolean returnTrue() {
+ return true;
+ }
+
+ /** A {@link CallStreamObserver} which executes the supplied callbacks. */
+ private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
+ private final Consumer<T> onNext;
+ private final Supplier<Boolean> isReady;
+ private final Consumer<Throwable> onError;
+ private final Runnable onCompleted;
+
+ public ForwardingCallStreamObserver(
+ Consumer<T> onNext,
+ Consumer<Throwable> onError,
+ Runnable onCompleted,
+ Supplier<Boolean> isReady) {
+ this.onNext = onNext;
+ this.onError = onError;
+ this.onCompleted = onCompleted;
+ this.isReady = isReady;
+ }
+
+ @Override
+ public void onNext(T value) {
+ onNext.accept(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ onError.accept(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.run();
+ }
+
+ @Override
+ public boolean isReady() {
+ return isReady.get();
+ }
+
+ @Override
+ public void setOnReadyHandler(Runnable onReadyHandler) {}
+
+ @Override
+ public void disableAutoInboundFlowControl() {}
+
+ @Override
+ public void request(int count) {}
+
+ @Override
+ public void setMessageCompression(boolean enable) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
new file mode 100644
index 0000000..b684c90
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreams}. */
+@RunWith(JUnit4.class)
+public class TestStreamsTest {
+ @Test
+ public void testOnNextIsCalled() {
+ AtomicBoolean onNextWasCalled = new AtomicBoolean();
+ TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
+ assertTrue(onNextWasCalled.get());
+ }
+
+ @Test
+ public void testIsReadyIsCalled() {
+ final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
+ assertFalse(TestStreams.withOnNext(null)
+ .withIsReady(() -> isReadyWasCalled.getAndSet(true))
+ .build()
+ .isReady());
+ assertTrue(isReadyWasCalled.get());
+ }
+
+ @Test
+ public void testOnCompletedIsCalled() {
+ AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+ TestStreams.withOnNext(null)
+ .withOnCompleted(() -> onCompletedWasCalled.set(true))
+ .build()
+ .onCompleted();
+ assertTrue(onCompletedWasCalled.get());
+ }
+
+ @Test
+ public void testOnErrorRunnableIsCalled() {
+ RuntimeException throwable = new RuntimeException();
+ AtomicBoolean onErrorWasCalled = new AtomicBoolean();
+ TestStreams.withOnNext(null)
+ .withOnError(() -> onErrorWasCalled.set(true))
+ .build()
+ .onError(throwable);
+ assertTrue(onErrorWasCalled.get());
+ }
+
+ @Test
+ public void testOnErrorConsumerIsCalled() {
+ RuntimeException throwable = new RuntimeException();
+ Collection<Throwable> onErrorWasCalled = new ArrayList<>();
+ TestStreams.withOnNext(null)
+ .withOnError(onErrorWasCalled::add)
+ .build()
+ .onError(throwable);
+ assertThat(onErrorWasCalled, contains(throwable));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
new file mode 100644
index 0000000..511cc3f
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.test.TestExecutors;
+import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link BeamFnDataReadRunner}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataReadRunnerTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
+ .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
+ private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(PORT_SPEC)).build();
+ private static final Coder<WindowedValue<String>> CODER =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+ private static final BeamFnApi.Coder CODER_SPEC;
+ static {
+ try {
+ CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
+ .build();
+ } catch (IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+ private static final BeamFnApi.Target INPUT_TARGET = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(1)
+ .setName("out")
+ .build();
+
+ @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+ @Mock private BeamFnDataClient mockBeamFnDataClientFactory;
+ @Captor private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testReuseForMultipleBundles() throws Exception {
+ CompletableFuture<Void> bundle1Future = new CompletableFuture<>();
+ CompletableFuture<Void> bundle2Future = new CompletableFuture<>();
+ when(mockBeamFnDataClientFactory.forInboundConsumer(
+ any(),
+ any(),
+ any(),
+ any())).thenReturn(bundle1Future).thenReturn(bundle2Future);
+ List<WindowedValue<String>> valuesA = new ArrayList<>();
+ List<WindowedValue<String>> valuesB = new ArrayList<>();
+ Map<String, Collection<ThrowingConsumer<WindowedValue<String>>>> outputMap = ImmutableMap.of(
+ "outA", ImmutableList.of(valuesA::add),
+ "outB", ImmutableList.of(valuesB::add));
+ AtomicLong bundleId = new AtomicLong(0);
+ BeamFnDataReadRunner<String> readRunner = new BeamFnDataReadRunner<>(
+ FUNCTION_SPEC,
+ bundleId::get,
+ INPUT_TARGET,
+ CODER_SPEC,
+ mockBeamFnDataClientFactory,
+ outputMap);
+
+ // Process for bundle id 0
+ readRunner.registerInputLocation();
+
+ verify(mockBeamFnDataClientFactory).forInboundConsumer(
+ eq(PORT_SPEC.getApiServiceDescriptor()),
+ eq(KV.of(bundleId.get(), INPUT_TARGET)),
+ eq(CODER),
+ consumerCaptor.capture());
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ // Sleep for some small amount of time simulating the parent blocking
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ try {
+ consumerCaptor.getValue().accept(valueInGlobalWindow("ABC"));
+ consumerCaptor.getValue().accept(valueInGlobalWindow("DEF"));
+ } catch (Exception e) {
+ bundle1Future.completeExceptionally(e);
+ } finally {
+ bundle1Future.complete(null);
+ }
+ }
+ });
+
+ readRunner.blockTillReadFinishes();
+ assertThat(valuesA, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF")));
+ assertThat(valuesB, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF")));
+
+ // Process for bundle id 1
+ bundleId.incrementAndGet();
+ valuesA.clear();
+ valuesB.clear();
+ readRunner.registerInputLocation();
+
+ verify(mockBeamFnDataClientFactory).forInboundConsumer(
+ eq(PORT_SPEC.getApiServiceDescriptor()),
+ eq(KV.of(bundleId.get(), INPUT_TARGET)),
+ eq(CODER),
+ consumerCaptor.capture());
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ // Sleep for some small amount of time simulating the parent blocking
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ try {
+ consumerCaptor.getValue().accept(valueInGlobalWindow("GHI"));
+ consumerCaptor.getValue().accept(valueInGlobalWindow("JKL"));
+ } catch (Exception e) {
+ bundle2Future.completeExceptionally(e);
+ } finally {
+ bundle2Future.complete(null);
+ }
+ }
+ });
+
+ readRunner.blockTillReadFinishes();
+ assertThat(valuesA, contains(valueInGlobalWindow("GHI"), valueInGlobalWindow("JKL")));
+ assertThat(valuesB, contains(valueInGlobalWindow("GHI"), valueInGlobalWindow("JKL")));
+
+ verifyNoMoreInteractions(mockBeamFnDataClientFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
new file mode 100644
index 0000000..ed67b14
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link BeamFnDataWriteRunner}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataWriteRunnerTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
+ .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
+ private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(PORT_SPEC)).build();
+ private static final Coder<WindowedValue<String>> CODER =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+ private static final BeamFnApi.Coder CODER_SPEC;
+ static {
+ try {
+ CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
+ .build();
+ } catch (IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+ private static final BeamFnApi.Target OUTPUT_TARGET = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(1)
+ .setName("out")
+ .build();
+
+ @Mock private BeamFnDataClient mockBeamFnDataClientFactory;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testReuseForMultipleBundles() throws Exception {
+ RecordingConsumer<WindowedValue<String>> valuesA = new RecordingConsumer<>();
+ RecordingConsumer<WindowedValue<String>> valuesB = new RecordingConsumer<>();
+ when(mockBeamFnDataClientFactory.forOutboundConsumer(
+ any(),
+ any(),
+ Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(valuesA).thenReturn(valuesB);
+ AtomicLong bundleId = new AtomicLong(0);
+ BeamFnDataWriteRunner<String> writeRunner = new BeamFnDataWriteRunner<>(
+ FUNCTION_SPEC,
+ bundleId::get,
+ OUTPUT_TARGET,
+ CODER_SPEC,
+ mockBeamFnDataClientFactory);
+
+ // Process for bundle id 0
+ writeRunner.registerForOutput();
+
+ verify(mockBeamFnDataClientFactory).forOutboundConsumer(
+ eq(PORT_SPEC.getApiServiceDescriptor()),
+ eq(KV.of(bundleId.get(), OUTPUT_TARGET)),
+ eq(CODER));
+
+ writeRunner.consume(valueInGlobalWindow("ABC"));
+ writeRunner.consume(valueInGlobalWindow("DEF"));
+ writeRunner.close();
+
+ assertTrue(valuesA.closed);
+ assertThat(valuesA, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF")));
+
+ // Process for bundle id 1
+ bundleId.incrementAndGet();
+ valuesA.clear();
+ valuesB.clear();
+ writeRunner.registerForOutput();
+
+ verify(mockBeamFnDataClientFactory).forOutboundConsumer(
+ eq(PORT_SPEC.getApiServiceDescriptor()),
+ eq(KV.of(bundleId.get(), OUTPUT_TARGET)),
+ eq(CODER));
+
+ writeRunner.consume(valueInGlobalWindow("GHI"));
+ writeRunner.consume(valueInGlobalWindow("JKL"));
+ writeRunner.close();
+
+ assertTrue(valuesB.closed);
+ assertThat(valuesB, contains(valueInGlobalWindow("GHI"), valueInGlobalWindow("JKL")));
+ verifyNoMoreInteractions(mockBeamFnDataClientFactory);
+ }
+
+ private static class RecordingConsumer<T> extends ArrayList<T>
+ implements CloseableThrowingConsumer<T> {
+ private boolean closed;
+ @Override
+ public void close() throws Exception {
+ closed = true;
+ }
+
+ @Override
+ public void accept(T t) throws Exception {
+ if (closed) {
+ throw new IllegalStateException("Consumer is closed but attempting to consume " + t);
+ }
+ add(t);
+ }
+
+ }
+}