You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:54 UTC

[3/6] beam git commit: A proposal for a portability framework to execute user definable functions.

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
new file mode 100644
index 0000000..92042d0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data
+ * to all consumers in the specified output map.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerInputLocation()} to start and call
+ * {@link #blockTillReadFinishes()} to finish.
+ */
+public class BeamFnDataReadRunner<OutputT> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+  private final Supplier<Long> processBundleInstructionIdSupplier;
+  private final BeamFnDataClient beamFnDataClientFactory;
+  private final Coder<WindowedValue<OutputT>> coder;
+  private final BeamFnApi.Target inputTarget;
+
+  private CompletableFuture<Void> readFuture;
+
+  public BeamFnDataReadRunner(
+      BeamFnApi.FunctionSpec functionSpec,
+      Supplier<Long> processBundleInstructionIdSupplier,
+      BeamFnApi.Target inputTarget,
+      BeamFnApi.Coder coderSpec,
+      BeamFnDataClient beamFnDataClientFactory,
+      Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap)
+          throws IOException {
+    this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class)
+        .getApiServiceDescriptor();
+    this.inputTarget = inputTarget;
+    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+    this.beamFnDataClientFactory = beamFnDataClientFactory;
+    this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
+
+    @SuppressWarnings("unchecked")
+    Coder<WindowedValue<OutputT>> coder = Serializer.deserialize(
+        OBJECT_MAPPER.readValue(
+            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
+            Map.class),
+        Coder.class);
+    this.coder = coder;
+  }
+
+  public void registerInputLocation() {
+    this.readFuture = beamFnDataClientFactory.forInboundConsumer(
+        apiServiceDescriptor,
+        KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
+        coder,
+        this::multiplexToConsumers);
+  }
+
+  public void blockTillReadFinishes() throws Exception {
+    LOGGER.debug("Waiting for process bundle instruction {} and target {} to close.",
+        processBundleInstructionIdSupplier.get(), inputTarget);
+    readFuture.get();
+  }
+
+  private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception {
+    for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+      consumer.accept(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
new file mode 100644
index 0000000..596afe5
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Registers as a consumer with the Beam Fn Data API. Propagates and elements consumed to
+ * the the registered consumer.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
+ */
+public class BeamFnDataWriteRunner<InputT> {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final BeamFnApi.Target outputTarget;
+  private final Coder<WindowedValue<InputT>> coder;
+  private final BeamFnDataClient beamFnDataClientFactory;
+  private final Supplier<Long> processBundleInstructionIdSupplier;
+
+  private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
+
+  public BeamFnDataWriteRunner(
+      BeamFnApi.FunctionSpec functionSpec,
+      Supplier<Long> processBundleInstructionIdSupplier,
+      BeamFnApi.Target outputTarget,
+      BeamFnApi.Coder coderSpec,
+      BeamFnDataClient beamFnDataClientFactory)
+          throws IOException {
+    this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class)
+        .getApiServiceDescriptor();
+    this.beamFnDataClientFactory = beamFnDataClientFactory;
+    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+    this.outputTarget = outputTarget;
+
+    @SuppressWarnings("unchecked")
+    Coder<WindowedValue<InputT>> coder = Serializer.deserialize(
+        OBJECT_MAPPER.readValue(
+            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
+            Map.class),
+        Coder.class);
+    this.coder = coder;
+  }
+
+  public void registerForOutput() {
+    consumer = beamFnDataClientFactory.forOutboundConsumer(
+        apiServiceDescriptor,
+        KV.of(processBundleInstructionIdSupplier.get(), outputTarget),
+        coder);
+  }
+
+  public void close() throws Exception {
+    consumer.close();
+  }
+
+  public void consume(WindowedValue<InputT> value) throws Exception {
+    consumer.accept(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
new file mode 100644
index 0000000..9d9c433
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A runner which creates {@link Reader}s for each {@link BoundedSource} and executes
+ * the {@link Reader}s read loop.
+ */
+public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
+  private final PipelineOptions pipelineOptions;
+  private final BeamFnApi.FunctionSpec definition;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+
+  public BoundedSourceRunner(
+      PipelineOptions pipelineOptions,
+      BeamFnApi.FunctionSpec definition,
+      Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+    this.pipelineOptions = pipelineOptions;
+    this.definition = definition;
+    this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
+  }
+
+  /**
+   * The runner harness is meant to send the source over the Beam Fn Data API which would be
+   * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the
+   * source instead of unpacking it from the data block of the function specification.
+   */
+  @Deprecated
+  public void start() throws Exception {
+    try {
+      // The representation here is defined as the java serialized representation of the
+      // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
+      byte[] bytes = definition.getData().unpack(BytesValue.class).getValue().toByteArray();
+      @SuppressWarnings("unchecked")
+      InputT boundedSource =
+          (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
+      runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
+    } catch (InvalidProtocolBufferException e) {
+      throw new IOException(
+          String.format("Failed to decode %s, expected %s",
+              definition.getData().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
+          e);
+    }
+  }
+
+  /**
+   * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s
+   * read loop. See {@link Reader} for further details of the read loop.
+   *
+   * <p>Propagates any exceptions caused during reading or processing via a consumer to the
+   * caller.
+   */
+  public void runReadLoop(WindowedValue<InputT> value) throws Exception {
+    try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
+      if (!reader.start()) {
+        // Reader has no data, immediately return
+        return;
+      }
+      do {
+        // TODO: Should this use the input window as the window for all the outputs?
+        WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow(
+            reader.getCurrent(), reader.getCurrentTimestamp());
+        for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+          consumer.accept(nextValue);
+        }
+      } while (reader.advance());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return definition.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/package-info.java
new file mode 100644
index 0000000..d250a6a
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides utilities for Beam runner authors.
+ */
+package org.apache.beam.runners.core;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
new file mode 100644
index 0000000..ff05225
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.fn.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.fn.v1.BeamFnApi.LogControl;
+import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FnHarness}. */
+@RunWith(JUnit4.class)
+public class FnHarnessTest {
+  private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST =
+      BeamFnApi.InstructionRequest.newBuilder()
+      .setInstructionId(999L)
+      .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance())
+      .build();
+  private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE =
+      BeamFnApi.InstructionResponse.newBuilder()
+      .setInstructionId(999L)
+      .setRegister(BeamFnApi.RegisterResponse.getDefaultInstance())
+      .build();
+
+  @Test
+  public void testLaunchFnHarnessAndTeardownCleanly() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<BeamFnApi.LogEntry> logEntries = new ArrayList<>();
+    List<BeamFnApi.InstructionResponse> instructionResponses = new ArrayList<>();
+
+    BeamFnLoggingGrpc.BeamFnLoggingImplBase loggingService =
+        new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+      @Override
+      public StreamObserver<BeamFnApi.LogEntry.List> logging(
+          StreamObserver<LogControl> responseObserver) {
+        return TestStreams.withOnNext(
+            (BeamFnApi.LogEntry.List entries) -> logEntries.addAll(entries.getLogEntriesList()))
+            .withOnCompleted(() -> responseObserver.onCompleted())
+            .build();
+      }
+    };
+
+    BeamFnControlGrpc.BeamFnControlImplBase controlService =
+        new BeamFnControlGrpc.BeamFnControlImplBase() {
+      @Override
+      public StreamObserver<InstructionResponse> control(
+          StreamObserver<InstructionRequest> responseObserver) {
+        CountDownLatch waitForResponses = new CountDownLatch(1 /* number of responses expected */);
+        options.as(GcsOptions.class).getExecutorService().submit(new Runnable() {
+          @Override
+          public void run() {
+            responseObserver.onNext(INSTRUCTION_REQUEST);
+            Uninterruptibles.awaitUninterruptibly(waitForResponses);
+            responseObserver.onCompleted();
+          }
+        });
+        return TestStreams.withOnNext(new Consumer<BeamFnApi.InstructionResponse>() {
+          @Override
+          public void accept(InstructionResponse t) {
+            instructionResponses.add(t);
+            waitForResponses.countDown();
+          }
+        }).withOnCompleted(waitForResponses::countDown).build();
+      }
+    };
+
+    Server loggingServer = ServerBuilder.forPort(0).addService(loggingService).build();
+    loggingServer.start();
+    try {
+      Server controlServer = ServerBuilder.forPort(0).addService(controlService).build();
+      controlServer.start();
+      try {
+        BeamFnApi.ApiServiceDescriptor loggingDescriptor = BeamFnApi.ApiServiceDescriptor
+            .newBuilder()
+            .setId(1L)
+            .setUrl("localhost:" + loggingServer.getPort())
+            .build();
+        BeamFnApi.ApiServiceDescriptor controlDescriptor = BeamFnApi.ApiServiceDescriptor
+            .newBuilder()
+            .setId(2L)
+            .setUrl("localhost:" + controlServer.getPort())
+            .build();
+
+        FnHarness.main(options, loggingDescriptor, controlDescriptor);
+        assertThat(instructionResponses, contains(INSTRUCTION_RESPONSE));
+      } finally {
+        controlServer.shutdownNow();
+      }
+    } finally {
+      loggingServer.shutdownNow();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
new file mode 100644
index 0000000..9f634c9
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import io.grpc.ManagedChannel;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ManagedChannelFactory}. */
+@RunWith(JUnit4.class)
+public class ManagedChannelFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testDefaultChannel() {
+    ApiServiceDescriptor apiServiceDescriptor = ApiServiceDescriptor.newBuilder()
+        .setUrl("localhost:123")
+        .build();
+    ManagedChannel channel = ManagedChannelFactory.from(PipelineOptionsFactory.create())
+        .forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollHostPortChannel() {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    ApiServiceDescriptor apiServiceDescriptor = ApiServiceDescriptor.newBuilder()
+        .setUrl("localhost:123")
+        .build();
+    ManagedChannel channel = ManagedChannelFactory.from(
+        PipelineOptionsFactory.fromArgs(new String[]{ "--experiments=beam_fn_api_epoll" }).create())
+        .forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollDomainSocketChannel() throws Exception {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    ApiServiceDescriptor apiServiceDescriptor = ApiServiceDescriptor.newBuilder()
+        .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath())
+        .build();
+    ManagedChannel channel = ManagedChannelFactory.from(
+        PipelineOptionsFactory.fromArgs(new String[]{ "--experiments=beam_fn_api_epoll" }).create())
+        .forDescriptor(apiServiceDescriptor);
+    assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), channel.authority());
+    channel.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
new file mode 100644
index 0000000..610a8ea
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SocketAddressFactory}. */
+@RunWith(JUnit4.class)
+public class SocketAddressFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testHostPortSocket() {
+    SocketAddress socketAddress = SocketAddressFactory.createFrom("localhost:123");
+    assertThat(socketAddress, instanceOf(InetSocketAddress.class));
+    assertEquals("localhost", ((InetSocketAddress) socketAddress).getHostString());
+    assertEquals(123, ((InetSocketAddress) socketAddress).getPort());
+  }
+
+  @Test
+  public void testDomainSocket() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    SocketAddress socketAddress = SocketAddressFactory.createFrom(
+        "unix://" + tmpFile.getAbsolutePath());
+    assertThat(socketAddress, instanceOf(DomainSocketAddress.class));
+    assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) socketAddress).path());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
new file mode 100644
index 0000000..fc3af49
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.EnumMap;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.ThrowingFunction;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnControlClient}. */
+@RunWith(JUnit4.class)
+public class BeamFnControlClientTest {
+  private static final BeamFnApi.InstructionRequest SUCCESSFUL_REQUEST =
+      BeamFnApi.InstructionRequest.newBuilder()
+      .setInstructionId(1L)
+      .setProcessBundle(BeamFnApi.ProcessBundleRequest.getDefaultInstance())
+      .build();
+  private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE =
+      BeamFnApi.InstructionResponse.newBuilder()
+      .setInstructionId(1L)
+      .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance())
+      .build();
+  private static final BeamFnApi.InstructionRequest UNKNOWN_HANDLER_REQUEST =
+      BeamFnApi.InstructionRequest.newBuilder()
+      .setInstructionId(2L)
+      .build();
+  private static final BeamFnApi.InstructionResponse UNKNOWN_HANDLER_RESPONSE =
+      BeamFnApi.InstructionResponse.newBuilder()
+      .setInstructionId(2L)
+      .setError("Unknown InstructionRequest type "
+          + BeamFnApi.InstructionRequest.RequestCase.REQUEST_NOT_SET)
+      .build();
+  private static final RuntimeException FAILURE = new RuntimeException("TestFailure");
+  private static final BeamFnApi.InstructionRequest FAILURE_REQUEST =
+      BeamFnApi.InstructionRequest.newBuilder()
+      .setInstructionId(3L)
+      .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance())
+      .build();
+  private static final BeamFnApi.InstructionResponse FAILURE_RESPONSE =
+      BeamFnApi.InstructionResponse.newBuilder()
+      .setInstructionId(3L)
+      .setError(getStackTraceAsString(FAILURE))
+      .build();
+
+  @Test
+  public void testDelegation() throws Exception {
+    AtomicBoolean clientClosedStream = new AtomicBoolean();
+    BlockingQueue<BeamFnApi.InstructionResponse> values = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamObserver<BeamFnApi.InstructionRequest>> outboundServerObservers =
+        new LinkedBlockingQueue<>();
+    CallStreamObserver<BeamFnApi.InstructionResponse> inboundServerObserver =
+        TestStreams.withOnNext(values::add)
+        .withOnCompleted(() -> clientClosedStream.set(true)).build();
+
+    BeamFnApi.ApiServiceDescriptor apiServiceDescriptor =
+        BeamFnApi.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+            .build();
+    Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+        .addService(new BeamFnControlGrpc.BeamFnControlImplBase() {
+          @Override
+          public StreamObserver<BeamFnApi.InstructionResponse> control(
+              StreamObserver<BeamFnApi.InstructionRequest> outboundObserver) {
+            Uninterruptibles.putUninterruptibly(outboundServerObservers, outboundObserver);
+            return inboundServerObserver;
+          }
+        })
+        .build();
+    server.start();
+    try {
+      ManagedChannel channel =
+          InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+      EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+      ThrowingFunction<BeamFnApi.InstructionRequest,
+                       BeamFnApi.InstructionResponse.Builder>> handlers =
+                       new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
+      handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
+        new ThrowingFunction<BeamFnApi.InstructionRequest,
+                             BeamFnApi.InstructionResponse.Builder>() {
+          @Override
+          public BeamFnApi.InstructionResponse.Builder apply(BeamFnApi.InstructionRequest value)
+              throws Exception {
+            return BeamFnApi.InstructionResponse.newBuilder()
+                .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
+          }
+        });
+      handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
+          new ThrowingFunction<BeamFnApi.InstructionRequest,
+                               BeamFnApi.InstructionResponse.Builder>() {
+            @Override
+            public BeamFnApi.InstructionResponse.Builder apply(BeamFnApi.InstructionRequest value)
+                throws Exception {
+              throw FAILURE;
+            }
+          });
+
+      BeamFnControlClient client = new BeamFnControlClient(
+                apiServiceDescriptor,
+                (BeamFnApi.ApiServiceDescriptor descriptor) -> channel,
+                this::createStreamForTest,
+                handlers);
+
+      // Get the connected client and attempt to send and receive an instruction
+      StreamObserver<BeamFnApi.InstructionRequest> outboundServerObserver =
+          outboundServerObservers.take();
+
+      ExecutorService executor = Executors.newCachedThreadPool();
+      Future<Void> future = executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          client.processInstructionRequests(executor);
+          return null;
+        }
+      });
+
+      outboundServerObserver.onNext(SUCCESSFUL_REQUEST);
+      assertEquals(SUCCESSFUL_RESPONSE, values.take());
+
+      // Ensure that conversion of an unknown request type is properly converted to a
+      // failure response.
+      outboundServerObserver.onNext(UNKNOWN_HANDLER_REQUEST);
+      assertEquals(UNKNOWN_HANDLER_RESPONSE, values.take());
+
+      // Ensure that all exceptions are caught and translated to failures
+      outboundServerObserver.onNext(FAILURE_REQUEST);
+      assertEquals(FAILURE_RESPONSE, values.take());
+
+      // Ensure that the server completing the stream translates to the completable future
+      // being completed allowing for a successful shutdown of the client.
+      outboundServerObserver.onCompleted();
+      future.get();
+    } finally {
+      server.shutdownNow();
+    }
+  }
+
+  private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
+      Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+      StreamObserver<ReqT> handler) {
+    return clientFactory.apply(handler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
new file mode 100644
index 0000000..1d451b5
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ProcessBundleHandler}. */
+@RunWith(JUnit4.class)
+public class ProcessBundleHandlerTest {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Coder<WindowedValue<String>> STRING_CODER =
+      WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+  private static final long LONG_CODER_SPEC_ID = 998L;
+  private static final long STRING_CODER_SPEC_ID = 999L;
+  private static final BeamFnApi.RemoteGrpcPort REMOTE_PORT = BeamFnApi.RemoteGrpcPort.newBuilder()
+      .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.newBuilder()
+          .setId(58L)
+          .setUrl("TestUrl"))
+      .build();
+  private static final BeamFnApi.Coder LONG_CODER_SPEC;
+  private static final BeamFnApi.Coder STRING_CODER_SPEC;
+  static {
+    try {
+      STRING_CODER_SPEC =
+          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+          .setId(STRING_CODER_SPEC_ID)
+          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+              OBJECT_MAPPER.writeValueAsBytes(STRING_CODER.asCloudObject()))).build())))
+          .build();
+      LONG_CODER_SPEC =
+          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+          .setId(STRING_CODER_SPEC_ID)
+          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+              OBJECT_MAPPER.writeValueAsBytes(WindowedValue.getFullCoder(
+                  VarLongCoder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject()))).build())))
+          .build();
+    } catch (IOException e) {
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
+  private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
+  private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
+  private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private BeamFnDataClient beamFnDataClient;
+  @Captor private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testOrderOfStartAndFinishCalls() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        .build();
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor);
+
+    List<BeamFnApi.PrimitiveTransform> transformsProcessed = new ArrayList<>();
+    List<String> orderOfOperations = new ArrayList<>();
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient) {
+      @Override
+      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+          BeamFnApi.PrimitiveTransform primitiveTransform,
+          Supplier<Long> processBundleInstructionId,
+          Function<BeamFnApi.Target,
+                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Consumer<ThrowingRunnable> addStartFunction,
+          Consumer<ThrowingRunnable> addFinishFunction)
+          throws IOException {
+
+        assertEquals((Long) 999L, processBundleInstructionId.get());
+
+        transformsProcessed.add(primitiveTransform);
+        addStartFunction.accept(
+            () -> orderOfOperations.add("Start" + primitiveTransform.getId()));
+        addFinishFunction.accept(
+            () -> orderOfOperations.add("Finish" + primitiveTransform.getId()));
+      }
+    };
+    handler.processBundle(BeamFnApi.InstructionRequest.newBuilder()
+        .setInstructionId(999L)
+        .setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+        .build());
+
+    // Processing of primitive transforms is performed in reverse order.
+    assertThat(transformsProcessed, contains(
+        processBundleDescriptor.getPrimitiveTransform(1),
+        processBundleDescriptor.getPrimitiveTransform(0)));
+    // Start should occur in reverse order while finish calls should occur in forward order
+    assertThat(orderOfOperations, contains("Start3", "Start2", "Finish2", "Finish3"));
+  }
+
+  @Test
+  public void testCreatingPrimitiveTransformExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        .build();
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor);
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient) {
+      @Override
+      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+          BeamFnApi.PrimitiveTransform primitiveTransform,
+          Supplier<Long> processBundleInstructionId,
+          Function<BeamFnApi.Target,
+                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Consumer<ThrowingRunnable> addStartFunction,
+          Consumer<ThrowingRunnable> addFinishFunction)
+          throws IOException {
+        thrown.expect(IllegalStateException.class);
+        thrown.expectMessage("TestException");
+        throw new IllegalStateException("TestException");
+      }
+    };
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+        .build());
+  }
+
+  @Test
+  public void testPrimitiveTransformStartExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        .build();
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor);
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient) {
+      @Override
+      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+          BeamFnApi.PrimitiveTransform primitiveTransform,
+          Supplier<Long> processBundleInstructionId,
+          Function<BeamFnApi.Target,
+                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Consumer<ThrowingRunnable> addStartFunction,
+          Consumer<ThrowingRunnable> addFinishFunction)
+          throws IOException {
+        thrown.expect(IllegalStateException.class);
+        thrown.expectMessage("TestException");
+        addStartFunction.accept(this::throwException);
+      }
+
+      private void throwException() {
+        throw new IllegalStateException("TestException");
+      }
+    };
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+        .build());
+  }
+
+  @Test
+  public void testPrimitiveTransformFinishExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
+        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        .build();
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, processBundleDescriptor);
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient) {
+      @Override
+      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+          BeamFnApi.PrimitiveTransform primitiveTransform,
+          Supplier<Long> processBundleInstructionId,
+          Function<BeamFnApi.Target,
+                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Consumer<ThrowingRunnable> addStartFunction,
+          Consumer<ThrowingRunnable> addFinishFunction)
+          throws IOException {
+        thrown.expect(IllegalStateException.class);
+        thrown.expectMessage("TestException");
+        addFinishFunction.accept(this::throwException);
+      }
+
+      private void throwException() {
+        throw new IllegalStateException("TestException");
+      }
+    };
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+        .build());
+  }
+
+  private static class TestDoFn extends DoFn<String, String> {
+    private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
+    private static final TupleTag<String> sideOutput = new TupleTag<>("sideOutput");
+
+    @StartBundle
+    public void startBundle(Context context) {
+      context.output("StartBundle");
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output("MainOutput" + context.element());
+      context.sideOutput(sideOutput, "SideOutput" + context.element());
+    }
+
+    @FinishBundle
+    public void finishBundle(Context context) {
+      context.output("FinishBundle");
+    }
+  }
+
+  /**
+   * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
+   * (mainOutput, sideOutput). Validate that inputs are fed to the {@link DoFn} and that outputs
+   * are directed to the correct consumers.
+   */
+  @Test
+  public void testCreatingAndProcessingDoFn() throws Exception {
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
+    long primitiveTransformId = 100L;
+    long mainOutputId = 101L;
+    long sideOutputId = 102L;
+
+    DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
+        new TestDoFn(),
+        WindowingStrategy.globalDefault(),
+        ImmutableList.of(),
+        STRING_CODER,
+        mainOutputId,
+        ImmutableMap.of(
+            mainOutputId, TestDoFn.mainOutput,
+            sideOutputId, TestDoFn.sideOutput));
+    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
+        .setId(1L)
+        .setUrn(JAVA_DO_FN_URN)
+        .setData(Any.pack(BytesValue.newBuilder()
+            .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
+            .build()))
+        .build();
+    BeamFnApi.Target inputATarget1 = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(1000L)
+        .setName("inputATarget1")
+        .build();
+    BeamFnApi.Target inputATarget2 = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(1001L)
+        .setName("inputATarget1")
+        .build();
+    BeamFnApi.Target inputBTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(1002L)
+        .setName("inputBTarget")
+        .build();
+    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
+        .setId(primitiveTransformId)
+        .setFunctionSpec(functionSpec)
+        .putInputs("inputA", BeamFnApi.Target.List.newBuilder()
+            .addTarget(inputATarget1)
+            .addTarget(inputATarget2)
+            .build())
+        .putInputs("inputB", BeamFnApi.Target.List.newBuilder()
+            .addTarget(inputBTarget)
+            .build())
+        .putOutputs(Long.toString(mainOutputId), BeamFnApi.PCollection.newBuilder()
+            .setCoderReference(STRING_CODER_SPEC_ID)
+            .build())
+        .putOutputs(Long.toString(sideOutputId), BeamFnApi.PCollection.newBuilder()
+            .setCoderReference(STRING_CODER_SPEC_ID)
+            .build())
+        .build();
+
+    List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
+    List<WindowedValue<String>> sideOutputValues = new ArrayList<>();
+    BeamFnApi.Target mainOutputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(primitiveTransformId)
+        .setName(Long.toString(mainOutputId))
+        .build();
+    BeamFnApi.Target sideOutputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(primitiveTransformId)
+        .setName(Long.toString(sideOutputId))
+        .build();
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
+        ImmutableMultimap.of(
+            mainOutputTarget, mainOutputValues::add,
+            sideOutputTarget, sideOutputValues::add);
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
+        HashMultimap.create();
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient);
+    handler.createConsumersForPrimitiveTransform(
+        primitiveTransform,
+        Suppliers.ofInstance(57L)::get,
+        existingConsumers::get,
+        newConsumers::put,
+        startFunctions::add,
+        finishFunctions::add);
+
+    Iterables.getOnlyElement(startFunctions).run();
+    assertThat(mainOutputValues, contains(valueInGlobalWindow("StartBundle")));
+    mainOutputValues.clear();
+
+    assertEquals(newConsumers.keySet(),
+        ImmutableSet.of(inputATarget1, inputATarget2, inputBTarget));
+
+    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("A1"));
+    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("A2"));
+    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("B"));
+    assertThat(mainOutputValues, contains(
+        valueInGlobalWindow("MainOutputA1"),
+        valueInGlobalWindow("MainOutputA2"),
+        valueInGlobalWindow("MainOutputB")));
+    assertThat(sideOutputValues, contains(
+        valueInGlobalWindow("SideOutputA1"),
+        valueInGlobalWindow("SideOutputA2"),
+        valueInGlobalWindow("SideOutputB")));
+    mainOutputValues.clear();
+    sideOutputValues.clear();
+
+    Iterables.getOnlyElement(finishFunctions).run();
+    assertThat(mainOutputValues, contains(valueInGlobalWindow("FinishBundle")));
+    mainOutputValues.clear();
+  }
+
+  @Test
+  public void testCreatingAndProcessingSource() throws Exception {
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, LONG_CODER_SPEC);
+    long primitiveTransformId = 100L;
+    long outputId = 101L;
+
+    BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(1000L)
+        .setName("inputTarget")
+        .build();
+
+    List<WindowedValue<String>> outputValues = new ArrayList<>();
+    BeamFnApi.Target outputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(primitiveTransformId)
+        .setName(Long.toString(outputId))
+        .build();
+
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
+        ImmutableMultimap.of(outputTarget, outputValues::add);
+    Multimap<BeamFnApi.Target,
+             ThrowingConsumer<WindowedValue<BoundedSource<Long>>>> newConsumers =
+             HashMultimap.create();
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
+        .setId(1L)
+        .setUrn(JAVA_SOURCE_URN)
+        .setData(Any.pack(BytesValue.newBuilder()
+            .setValue(ByteString.copyFrom(
+                SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
+            .build()))
+        .build();
+
+    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
+        .setId(primitiveTransformId)
+        .setFunctionSpec(functionSpec)
+        .putInputs("input",
+            BeamFnApi.Target.List.newBuilder().addTarget(inputTarget).build())
+        .putOutputs(Long.toString(outputId),
+            BeamFnApi.PCollection.newBuilder().setCoderReference(LONG_CODER_SPEC_ID).build())
+        .build();
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient);
+
+    handler.createConsumersForPrimitiveTransform(
+        primitiveTransform,
+        Suppliers.ofInstance(57L)::get,
+        existingConsumers::get,
+        newConsumers::put,
+        startFunctions::add,
+        finishFunctions::add);
+
+    // This is testing a deprecated way of running sources and should be removed
+    // once all source definitions are instead propagated along the input edge.
+    Iterables.getOnlyElement(startFunctions).run();
+    assertThat(outputValues, contains(
+        valueInGlobalWindow(0L),
+        valueInGlobalWindow(1L),
+        valueInGlobalWindow(2L)));
+    outputValues.clear();
+
+    // Check that when passing a source along as an input, the source is processed.
+    assertEquals(newConsumers.keySet(), ImmutableSet.of(inputTarget));
+    Iterables.getOnlyElement(newConsumers.get(inputTarget)).accept(
+        valueInGlobalWindow(CountingSource.upTo(2)));
+    assertThat(outputValues, contains(
+        valueInGlobalWindow(0L),
+        valueInGlobalWindow(1L)));
+
+    assertThat(finishFunctions, empty());
+  }
+
+  @Test
+  public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
+    long bundleId = 57L;
+    long primitiveTransformId = 100L;
+    long outputId = 101L;
+
+    List<WindowedValue<String>> outputValues = new ArrayList<>();
+    BeamFnApi.Target outputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(primitiveTransformId)
+        .setName(Long.toString(outputId))
+        .build();
+
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
+        ImmutableMultimap.of(outputTarget, outputValues::add);
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
+        HashMultimap.create();
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
+        .setId(1L)
+        .setUrn(DATA_INPUT_URN)
+        .setData(Any.pack(REMOTE_PORT))
+        .build();
+
+    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
+        .setId(primitiveTransformId)
+        .setFunctionSpec(functionSpec)
+        .putInputs("input", BeamFnApi.Target.List.getDefaultInstance())
+        .putOutputs(Long.toString(outputId),
+            BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build())
+        .build();
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient);
+
+    handler.createConsumersForPrimitiveTransform(
+        primitiveTransform,
+        Suppliers.ofInstance(bundleId)::get,
+        existingConsumers::get,
+        newConsumers::put,
+        startFunctions::add,
+        finishFunctions::add);
+
+    verifyZeroInteractions(beamFnDataClient);
+
+    CompletableFuture<Void> completionFuture = new CompletableFuture<>();
+    when(beamFnDataClient.forInboundConsumer(any(), any(), any(), any()))
+        .thenReturn(completionFuture);
+    Iterables.getOnlyElement(startFunctions).run();
+    verify(beamFnDataClient).forInboundConsumer(
+        eq(REMOTE_PORT.getApiServiceDescriptor()),
+        eq(KV.of(bundleId, BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(primitiveTransformId)
+            .setName("input")
+            .build())),
+        eq(STRING_CODER),
+        consumerCaptor.capture());
+
+    consumerCaptor.getValue().accept(valueInGlobalWindow("TestValue"));
+    assertThat(outputValues, contains(valueInGlobalWindow("TestValue")));
+    outputValues.clear();
+
+    assertThat(newConsumers.keySet(), empty());
+
+    completionFuture.complete(null);
+    Iterables.getOnlyElement(finishFunctions).run();
+
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
+    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
+    long bundleId = 57L;
+    long primitiveTransformId = 100L;
+    long outputId = 101L;
+
+    BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
+        .setPrimitiveTransformReference(1000L)
+        .setName("inputTarget")
+        .build();
+
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
+        ImmutableMultimap.of();
+    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
+        HashMultimap.create();
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
+        .setId(1L)
+        .setUrn(DATA_OUTPUT_URN)
+        .setData(Any.pack(REMOTE_PORT))
+        .build();
+
+    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
+        .setId(primitiveTransformId)
+        .setFunctionSpec(functionSpec)
+        .putInputs("input", BeamFnApi.Target.List.newBuilder().addTarget(inputTarget).build())
+        .putOutputs(Long.toString(outputId),
+            BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build())
+        .build();
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient);
+
+    handler.createConsumersForPrimitiveTransform(
+        primitiveTransform,
+        Suppliers.ofInstance(bundleId)::get,
+        existingConsumers::get,
+        newConsumers::put,
+        startFunctions::add,
+        finishFunctions::add);
+
+    verifyZeroInteractions(beamFnDataClient);
+
+    List<WindowedValue<String>> outputValues = new ArrayList<>();
+    AtomicBoolean wasCloseCalled = new AtomicBoolean();
+    CloseableThrowingConsumer<WindowedValue<String>> outputConsumer =
+        new CloseableThrowingConsumer<WindowedValue<String>>(){
+      @Override
+      public void close() throws Exception {
+        wasCloseCalled.set(true);
+      }
+
+      @Override
+      public void accept(WindowedValue<String> t) throws Exception {
+        outputValues.add(t);
+      }
+    };
+
+    when(beamFnDataClient.forOutboundConsumer(
+        any(),
+        any(),
+        Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(outputConsumer);
+    Iterables.getOnlyElement(startFunctions).run();
+    verify(beamFnDataClient).forOutboundConsumer(
+        eq(REMOTE_PORT.getApiServiceDescriptor()),
+        eq(KV.of(bundleId, BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(primitiveTransformId)
+            .setName(Long.toString(outputId))
+            .build())),
+        eq(STRING_CODER));
+
+    assertEquals(newConsumers.keySet(), ImmutableSet.of(inputTarget));
+    Iterables.getOnlyElement(newConsumers.get(inputTarget)).accept(
+        valueInGlobalWindow("TestValue"));
+    assertThat(outputValues, contains(valueInGlobalWindow("TestValue")));
+    outputValues.clear();
+
+    assertFalse(wasCloseCalled.get());
+    Iterables.getOnlyElement(finishFunctions).run();
+    assertTrue(wasCloseCalled.get());
+
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
new file mode 100644
index 0000000..7b07a08
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.test.TestExecutors;
+import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RegisterHandler}. */
+@RunWith(JUnit4.class)
+public class RegisterHandlerTest {
+  @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
+
+  private static final BeamFnApi.InstructionRequest REGISTER_REQUEST =
+      BeamFnApi.InstructionRequest.newBuilder()
+      .setInstructionId(1L)
+      .setRegister(BeamFnApi.RegisterRequest.newBuilder()
+          .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(1L)
+              .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setId(10L)).build()))
+          .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(2L)
+              .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setId(20L)).build()))
+          .build())
+      .build();
+  private static final BeamFnApi.InstructionResponse REGISTER_RESPONSE =
+      BeamFnApi.InstructionResponse.newBuilder()
+      .setRegister(RegisterResponse.getDefaultInstance())
+      .build();
+
+  @Test
+  public void testRegistration() throws Exception {
+    RegisterHandler handler = new RegisterHandler();
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        executor.submit(new Callable<BeamFnApi.InstructionResponse>() {
+          @Override
+          public BeamFnApi.InstructionResponse call() throws Exception {
+            // Purposefully wait a small amount of time making it likely that
+            // a downstream caller needs to block.
+            Thread.sleep(100);
+            return handler.register(REGISTER_REQUEST).build();
+          }
+    });
+    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0),
+        handler.getById(1L));
+    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1),
+        handler.getById(2L));
+    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCoders(0),
+        handler.getById(10L));
+    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCoders(0),
+        handler.getById(20L));
+    assertEquals(REGISTER_RESPONSE, responseFuture.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
new file mode 100644
index 0000000..64a0e11
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataBufferingOutboundObserver}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataBufferingOutboundObserverTest {
+  private static final int DEFAULT_BUFFER_LIMIT = 1_000_000;
+  private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L,
+      BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(555L).setName("Test").build());
+  private static final Coder<WindowedValue<byte[]>> CODER =
+      LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of()));
+
+  @Test
+  public void testWithDefaultBuffer() throws Exception {
+    Collection<BeamFnApi.Elements> values = new ArrayList<>();
+    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    CloseableThrowingConsumer<WindowedValue<byte[]>> consumer =
+        new BeamFnDataBufferingOutboundObserver<>(
+        PipelineOptionsFactory.create(),
+        OUTPUT_LOCATION,
+        CODER,
+        TestStreams.withOnNext(values::add)
+            .withOnCompleted(() -> onCompletedWasCalled.set(true))
+            .build());
+
+    // Test that nothing is emitted till the default buffer size is surpassed.
+    consumer.accept(valueInGlobalWindow(new byte[DEFAULT_BUFFER_LIMIT - 50]));
+    assertThat(values, empty());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[50]));
+    assertEquals(
+        messageWithData(new byte[DEFAULT_BUFFER_LIMIT - 50], new byte[50]),
+        Iterables.get(values, 0));
+
+    // Test that nothing is emitted till the default buffer size is surpassed after a reset
+    consumer.accept(valueInGlobalWindow(new byte[DEFAULT_BUFFER_LIMIT - 50]));
+    assertEquals(1, values.size());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[50]));
+    assertEquals(
+        messageWithData(new byte[DEFAULT_BUFFER_LIMIT - 50], new byte[50]),
+        Iterables.get(values, 1));
+
+    // Test that when we close with an empty buffer we only have one end of stream
+    consumer.close();
+    assertEquals(messageWithData(),
+        Iterables.get(values, 2));
+  }
+
+  @Test
+  public void testExperimentConfiguresBufferLimit() throws Exception {
+    Collection<BeamFnApi.Elements> values = new ArrayList<>();
+    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    CloseableThrowingConsumer<WindowedValue<byte[]>> consumer =
+        new BeamFnDataBufferingOutboundObserver<>(
+        PipelineOptionsFactory.fromArgs(
+            new String[] { "--experiments=beam_fn_api_data_buffer_limit=100" }).create(),
+        OUTPUT_LOCATION,
+        CODER,
+        TestStreams.withOnNext(values::add)
+            .withOnCompleted(() -> onCompletedWasCalled.set(true))
+            .build());
+
+    // Test that nothing is emitted till the default buffer size is surpassed.
+    consumer.accept(valueInGlobalWindow(new byte[51]));
+    assertThat(values, empty());
+
+    // Test that when we cross the buffer, we emit.
+    consumer.accept(valueInGlobalWindow(new byte[49]));
+    assertEquals(
+        messageWithData(new byte[51], new byte[49]),
+        Iterables.get(values, 0));
+
+    // Test that when we close we empty the value, and then the stream terminator as part
+    // of the same message
+    consumer.accept(valueInGlobalWindow(new byte[1]));
+    consumer.close();
+    assertEquals(
+        BeamFnApi.Elements.newBuilder(messageWithData(new byte[1]))
+            .addData(BeamFnApi.Elements.Data.newBuilder()
+                .setInstructionReference(OUTPUT_LOCATION.getKey())
+                .setTarget(OUTPUT_LOCATION.getValue()))
+            .build(),
+        Iterables.get(values, 1));
+  }
+
+  private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException {
+    ByteString.Output output = ByteString.newOutput();
+    for (byte[] data : datum) {
+      CODER.encode(valueInGlobalWindow(data), output, Context.NESTED);
+    }
+    return BeamFnApi.Elements.newBuilder()
+        .addData(BeamFnApi.Elements.Data.newBuilder()
+            .setInstructionReference(OUTPUT_LOCATION.getKey())
+            .setTarget(OUTPUT_LOCATION.getValue())
+            .setData(output.toByteString()))
+        .build();
+  }
+}