You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:56 UTC

[5/6] beam git commit: A proposal for a portability framework to execute user definable functions.

A proposal for a portability framework to execute user definable functions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4b2bec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4b2bec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4b2bec

Branch: refs/heads/master
Commit: 0b4b2becb45b9f637ba31f599ebe8be0331bd633
Parents: 582c4a8
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jan 19 15:16:55 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jan 30 12:47:55 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |  36 +-
 runners/apex/pom.xml                            |   2 +-
 sdks/common/fn-api/pom.xml                      | 111 +++
 .../fn-api/src/main/proto/beam_fn_api.proto     | 771 +++++++++++++++++++
 sdks/common/pom.xml                             |  38 +
 .../src/main/resources/beam/findbugs-filter.xml |  32 +-
 sdks/java/harness/pom.xml                       | 167 ++++
 .../org/apache/beam/fn/harness/FnHarness.java   | 131 ++++
 .../harness/channel/ManagedChannelFactory.java  |  80 ++
 .../harness/channel/SocketAddressFactory.java   |  64 ++
 .../beam/fn/harness/channel/package-info.java   |  22 +
 .../fn/harness/control/BeamFnControlClient.java | 165 ++++
 .../harness/control/ProcessBundleHandler.java   | 334 ++++++++
 .../fn/harness/control/RegisterHandler.java     |  92 +++
 .../beam/fn/harness/control/package-info.java   |  22 +
 .../BeamFnDataBufferingOutboundObserver.java    | 135 ++++
 .../beam/fn/harness/data/BeamFnDataClient.java  |  64 ++
 .../fn/harness/data/BeamFnDataGrpcClient.java   | 122 +++
 .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 ++++
 .../harness/data/BeamFnDataInboundObserver.java |  81 ++
 .../beam/fn/harness/data/package-info.java      |  22 +
 .../fn/harness/fake/FakeAggregatorFactory.java  |  52 ++
 .../beam/fn/harness/fake/FakeStepContext.java   |  70 ++
 .../beam/fn/harness/fake/package-info.java      |  22 +
 .../harness/fn/CloseableThrowingConsumer.java   |  23 +
 .../beam/fn/harness/fn/ThrowingBiFunction.java  |  32 +
 .../beam/fn/harness/fn/ThrowingConsumer.java    |  32 +
 .../beam/fn/harness/fn/ThrowingFunction.java    |  32 +
 .../beam/fn/harness/fn/ThrowingRunnable.java    |  30 +
 .../apache/beam/fn/harness/fn/package-info.java |  22 +
 .../fn/harness/logging/BeamFnLoggingClient.java | 308 ++++++++
 .../beam/fn/harness/logging/package-info.java   |  22 +
 .../apache/beam/fn/harness/package-info.java    |  22 +
 .../beam/fn/harness/stream/AdvancingPhaser.java |  36 +
 .../harness/stream/BufferingStreamObserver.java | 166 ++++
 .../fn/harness/stream/DirectStreamObserver.java |  71 ++
 .../ForwardingClientResponseObserver.java       |  63 ++
 .../harness/stream/StreamObserverFactory.java   |  91 +++
 .../beam/fn/harness/stream/package-info.java    |  22 +
 .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++
 .../runners/core/BeamFnDataWriteRunner.java     |  87 +++
 .../beam/runners/core/BoundedSourceRunner.java  | 105 +++
 .../apache/beam/runners/core/package-info.java  |  22 +
 .../apache/beam/fn/harness/FnHarnessTest.java   | 130 ++++
 .../channel/ManagedChannelFactoryTest.java      |  74 ++
 .../channel/SocketAddressFactoryTest.java       |  56 ++
 .../control/BeamFnControlClientTest.java        | 182 +++++
 .../control/ProcessBundleHandlerTest.java       | 674 ++++++++++++++++
 .../fn/harness/control/RegisterHandlerTest.java |  80 ++
 ...BeamFnDataBufferingOutboundObserverTest.java | 142 ++++
 .../harness/data/BeamFnDataGrpcClientTest.java  | 309 ++++++++
 .../data/BeamFnDataGrpcMultiplexerTest.java     |  96 +++
 .../data/BeamFnDataInboundObserverTest.java     | 116 +++
 .../logging/BeamFnLoggingClientTest.java        | 169 ++++
 .../fn/harness/stream/AdvancingPhaserTest.java  |  48 ++
 .../stream/BufferingStreamObserverTest.java     | 146 ++++
 .../stream/DirectStreamObserverTest.java        | 139 ++++
 .../ForwardingClientResponseObserverTest.java   |  60 ++
 .../stream/StreamObserverFactoryTest.java       |  84 ++
 .../beam/fn/harness/test/TestExecutors.java     |  85 ++
 .../beam/fn/harness/test/TestExecutorsTest.java | 160 ++++
 .../beam/fn/harness/test/TestStreams.java       | 162 ++++
 .../beam/fn/harness/test/TestStreamsTest.java   |  84 ++
 .../runners/core/BeamFnDataReadRunnerTest.java  | 187 +++++
 .../runners/core/BeamFnDataWriteRunnerTest.java | 155 ++++
 .../runners/core/BoundedSourceRunnerTest.java   | 113 +++
 sdks/java/pom.xml                               |   1 +
 sdks/pom.xml                                    |   1 +
 68 files changed, 7514 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d09bf59..a53453b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
     <google-clients.version>1.22.0</google-clients.version>
     <google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version>
     <google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
-    <guava.version>19.0</guava.version>
+    <guava.version>20.0</guava.version>
     <grpc.version>1.0.1</grpc.version>
     <hamcrest.version>1.3</hamcrest.version>
     <jackson.version>2.7.2</jackson.version>
@@ -127,7 +127,7 @@
     <mockito.version>1.9.5</mockito.version>
     <netty.version>4.1.3.Final</netty.version>
     <os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>
-    <protobuf.version>3.0.0</protobuf.version>
+    <protobuf.version>3.1.0</protobuf.version>
     <pubsub.version>v1-rev10-1.22.0</pubsub.version>
     <slf4j.version>1.7.14</slf4j.version>
     <stax2.version>3.1.4</stax2.version>
@@ -314,6 +314,11 @@
 
   <dependencyManagement>
     <dependencies>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-common-fn-api</artifactId>
+        <version>${project.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.apache.beam</groupId>
@@ -729,6 +734,13 @@
       </dependency>
 
       <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-transport-native-epoll</artifactId>
+        <version>${netty.version}</version>
+        <classifier>linux-x86_64</classifier>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
         <version>${avro.version}</version>
@@ -741,6 +753,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.google.errorprone</groupId>
+        <artifactId>error_prone_annotations</artifactId>
+        <version>2.0.13</version>
+      </dependency>
+
+      <dependency>
         <groupId>joda-time</groupId>
         <artifactId>joda-time</artifactId>
         <version>${joda.version}</version>
@@ -824,9 +842,23 @@
   </dependencyManagement>
 
   <build>
+    <extensions>
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>${os-maven-plugin.version}</version>
+      </extension>
+    </extensions>
+
     <pluginManagement>
       <plugins>
         <plugin>
+          <groupId>org.xolstice.maven.plugins</groupId>
+          <artifactId>protobuf-maven-plugin</artifactId>
+          <version>0.5.0</version>
+        </plugin>
+
+        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-checkstyle-plugin</artifactId>
           <version>2.17</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 7ae07e2..5e16083 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -226,7 +226,7 @@
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency>
               </ignoredUsedUndeclaredDependencies>
             </configuration>
           </execution>

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
new file mode 100644
index 0000000..72788d0
--- /dev/null
+++ b/sdks/common/fn-api/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <packaging>jar</packaging>
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-common-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-common-fn-api</artifactId>
+  <name>Apache Beam :: SDKs :: Common :: Fn API</name>
+  <description>This artifact generates the stub bindings.</description>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+      <resource>
+        <directory>${project.build.directory}/original_sources_to_package</directory>
+      </resource>
+    </resources>
+
+    <plugins>
+      <!-- Skip the checkstyle plugin on generated code -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <!-- Skip the findbugs plugin on generated code -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <configuration>
+          <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+          <pluginId>grpc-java</pluginId>
+          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>compile-custom</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
new file mode 100644
index 0000000..3ac0fbf
--- /dev/null
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Fn API and boostrapping.
+ *
+ * TODO: Usage of plural names in lists looks awkward in Java
+ * e.g. getOutputsMap, addCodersBuilder
+ *
+ * TODO: gRPC / proto field names conflict with generated code
+ * e.g. "class" in java, "output" in python
+ */
+
+syntax = "proto3";
+
+/* TODO: Consider consolidating common components in another package
+ * and lanaguage namespaces for re-use with Runner Api.
+ */
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "BeamFnApi";
+
+import "google/protobuf/any.proto";
+import "google/protobuf/timestamp.proto";
+
+/*
+ * Constructs that define the pipeline shape.
+ *
+ * These are mostly unstable due to the missing pieces to be shared with
+ * the Runner Api like windowing strategy, display data, .... There are still
+ * some modelling questions related to whether a side input is modelled
+ * as another field on a PrimitiveTransform or as part of inputs and we
+ * still are missing things like the CompositeTransform.
+ */
+
+// A representation of an input or output definition on a primitive transform.
+// Stable
+message Target {
+  // A repeated list of target definitions.
+  message List {
+    repeated Target target = 1;
+  }
+
+  // (Required) The id of the PrimitiveTransform which is the target.
+  int64 primitive_transform_reference = 1;
+
+  // (Required) The local name of an input or output defined on the primitive
+  // transform.
+  string name = 2;
+}
+
+// Information defining a PCollection
+message PCollection {
+  // (Required) A reference to a coder.
+  int64 coder_reference = 1;
+
+  // TODO: Windowing strategy, ...
+}
+
+// A primitive transform within Apache Beam.
+message PrimitiveTransform {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  int64 id = 1;
+
+  // (Required) A function spec that is used by this primitive
+  // transform to process data.
+  FunctionSpec function_spec = 2;
+
+  // A map of distinct input names to target definitions.
+  // For example, in CoGbk this represents the tag name associated with each
+  // distinct input name and a list of primitive transforms that are associated
+  // with the specified input.
+  map<string, Target.List> inputs = 3;
+
+  // A map from local output name to PCollection definitions. For example, in
+  // DoFn this represents the tag name associated with each distinct output.
+  map<string, PCollection> outputs = 4;
+
+  // TODO: Should we model side inputs as a special type of input for a
+  // primitive transform or should it be modeled as the relationship that
+  // the predecessor input will be a view primitive transform.
+  // A map of from side input names to side inputs.
+  map<string, SideInput> side_inputs = 5;
+
+  // The user name of this step.
+  // TODO: This should really be in display data and not at this level
+  string step_name = 6;
+}
+
+/*
+ * User Definable Functions
+ *
+ * This is still unstable mainly due to how we model the side input.
+ */
+
+// Defines the common elements of user-definable functions, to allow the SDK to
+// express the information the runner needs to execute work.
+// Stable
+message FunctionSpec {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  int64 id = 1;
+
+  // (Required) A globally unique name representing this user definable
+  // function.
+  //
+  // User definable functions use the urn encodings registered such that another
+  // may implement the user definable function within another language.
+  //
+  // For example:
+  //    urn:org.apache.beam:coder:kv:1.0
+  string urn = 2;
+
+  // (Required) Reference to specification of execution environment required to
+  // invoke this function.
+  int64 environment_reference = 3;
+
+  // Data used to parameterize this function. Depending on the urn, this may be
+  // optional or required.
+  google.protobuf.Any data = 4;
+}
+
+message SideInput {
+  // TODO: Coder?
+
+  // For RunnerAPI.
+  Target input = 1;
+
+  // For FnAPI.
+  FunctionSpec view_fn = 2;
+}
+
+// Defines how to encode values into byte streams and decode values from byte
+// streams. A coder can be parameterized by additional properties which may or
+// may not be language agnostic.
+//
+// Coders using the urn:org.apache.beam:coder namespace must have their
+// encodings registered such that another may implement the encoding within
+// another language.
+//
+// For example:
+//    urn:org.apache.beam:coder:kv:1.0
+//    urn:org.apache.beam:coder:iterable:1.0
+// Stable
+message Coder {
+  // TODO: This looks weird when compared to the other function specs
+  // which use URN to differentiate themselves. Should "Coder" be embedded
+  // inside the FunctionSpec data block.
+
+  // The data associated with this coder used to reconstruct it.
+  FunctionSpec function_spec = 1;
+
+  // A list of component coder references.
+  //
+  // For a key-value coder, there must be exactly two component coder references
+  // where the first reference represents the key coder and the second reference
+  // is the value coder.
+  //
+  // For an iterable coder, there must be exactly one component coder reference
+  // representing the value coder.
+  //
+  // TODO: Perhaps this is redundant with the data of the FunctionSpec
+  // for known coders?
+  repeated int64 component_coder_reference = 2;
+}
+
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+message RemoteGrpcPort {
+  // (Required) An API descriptor which describes where to
+  // connect to including any authentication that is required.
+  ApiServiceDescriptor api_service_descriptor = 1;
+}
+
+/*
+ * Control Plane API
+ *
+ * Progress reporting and splitting still need further vetting. Also, this may change
+ * with the addition of new types of instructions/responses related to metrics.
+ */
+
+// An API that describes the work that a SDK Fn Harness is meant to do.
+// Stable
+service BeamFnControl {
+  // Instructions sent by the runner to the SDK requesting different types
+  // of work.
+  rpc Control(
+    // A stream of responses to instructions the SDK was asked to be performed.
+    stream InstructionResponse
+  ) returns (
+    // A stream of instructions requested of the SDK to be performed.
+    stream InstructionRequest
+  ) {}
+}
+
+// A request sent by a runner which it the SDK is asked to fulfill.
+// Stable
+message InstructionRequest {
+  // (Required) An unique identifier provided by the runner which represents
+  // this requests execution. The InstructionResponse MUST have the matching id.
+  int64 instruction_id = 1;
+
+  // (Required) A request that the SDK Harness needs to interpret.
+  oneof request {
+    RegisterRequest register = 1000;
+    ProcessBundleRequest process_bundle = 1001;
+    ProcessBundleProgressRequest process_bundle_progress = 1002;
+    ProcessBundleSplitRequest process_bundle_split = 1003;
+  }
+}
+
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+message InstructionResponse {
+  // (Required) A reference provided by the runner which represents a requests
+  // execution. The InstructionResponse MUST have the matching id when
+  // responding to the runner.
+  int64 instruction_id = 1;
+
+  // If this is specified, then this instruction has failed.
+  // A human readable string representing the reason as to why processing has
+  // failed.
+  string error = 2;
+
+  // If the instruction did not fail, it is required to return an equivalent
+  // response type depending on the request this matches.
+  oneof response {
+    RegisterResponse register = 1000;
+    ProcessBundleResponse process_bundle = 1001;
+    ProcessBundleProgressResponse process_bundle_progress = 1002;
+    ProcessBundleSplitResponse process_bundle_split = 1003;
+  }
+}
+
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+message RegisterRequest {
+  // (Optional) The set of descriptors used to process bundles.
+  repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
+}
+
+// Stable
+message RegisterResponse {
+}
+
+// A descriptor of references used when processing a bundle.
+// Stable
+message ProcessBundleDescriptor {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  int64 id = 1;
+
+  // (Required) A list of primitive transforms that should
+  // be used to construct the bundle processing graph.
+  repeated PrimitiveTransform primitive_transform = 2;
+
+  // (Required) The set of all coders referenced in this bundle.
+  repeated Coder coders = 4;
+}
+
+// A request to process a given bundle.
+// Stable
+message ProcessBundleRequest {
+  int64 process_bundle_descriptor_reference = 1;
+}
+
+// Stable
+message ProcessBundleResponse {
+}
+
+message ProcessBundleProgressRequest {
+  // (Required) A reference to an active process bundle request with the given
+  // instruction id.
+  int64 instruction_reference = 1;
+}
+
+message ProcessBundleProgressResponse {
+  // (Required) The finished amount of work. A monotonically increasing
+  // unitless measure of work finished.
+  double finished_work = 1;
+
+  // (Required) The known amount of backlog for the process bundle request.
+  // Computed as:
+  //   (estimated known work - finish work) / finished work
+  double backlog = 2;
+}
+
+message ProcessBundleSplitRequest {
+  // (Required) A reference to an active process bundle request with the given
+  // instruction id.
+  int64 instruction_reference = 1;
+
+  // (Required) The fraction of work (when compared to the known amount of work)
+  // the process bundle request should try to split at.
+  double fraction = 2;
+}
+
+// urn:org.apache.beam:restriction:element-count:1.0
+message ElementCountRestriction {
+  // A restriction representing the number of elements that should be processed.
+  // Effectively the range [0, count]
+  int64 count = 1;
+}
+
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+message ElementCountSkipRestriction {
+  // A restriction representing the number of elements that should be skipped.
+  // Effectively the range (count, infinity]
+  int64 count = 1;
+}
+
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+//   * a restriction (R_done) representing all elements that will be fully
+//     processed
+//   * a restriction (R_todo) representing all elements that will not be fully
+//     processed
+//
+// where:
+//   R_initial = R_done \u22c3 R_todo
+message PrimitiveTransformSplit {
+  // (Required) A reference to a primitive transform with the given id that
+  // is part of the active process bundle request with the given instruction
+  // id.
+  int64 primitive_transform_reference = 1;
+
+  // (Required) A function specification describing the restriction
+  // that has been completed by the primitive transform.
+  //
+  // For example, a remote GRPC source will have a specific urn and data
+  // block containing an ElementCountRestriction.
+  FunctionSpec completed_restriction = 2;
+
+  // (Required) A function specification describing the restriction
+  // representing the remainder of work for the primitive transform.
+  //
+  // FOr example, a remote GRPC source will have a specific urn and data
+  // block contain an ElemntCountSkipRestriction.
+  FunctionSpec remaining_restriction = 3;
+}
+
+message ProcessBundleSplitResponse {
+  // (Optional) A set of split responses for a currently active work item.
+  //
+  // If primitive transform B is a descendant of primitive transform A and both
+  // A and B report a split. Then B's restriction is reported as an element
+  // restriction pair and thus the fully reported restriction is:
+  //   R = A_done
+  //     \u22c3 (A_boundary \u22c2 B_done)
+  //     \u22c3 (A_boundary \u22c2 B_todo)
+  //     \u22c3 A_todo
+  // If there is a decendant of B named C, then C would similarly report a
+  // set of element pair restrictions.
+  //
+  // This restriction is processed and completed by the currently active process
+  // bundle request:
+  //   A_done \u22c3 (A_boundary \u22c2 B_done)
+  // and these restrictions will be processed by future process bundle requests:
+  //   A_boundary \u22c2 B_todo (passed to SDF B directly)
+  //   A_todo (passed to SDF A directly)
+
+  // If primitive transform B and C are siblings and descendants of A and A, B,
+  // and C report a split. Then B and C's restrictions are relative to A's.
+  //   R = A_done
+  //     \u22c3 (A_boundary \u22c2 B_done)
+  //     \u22c3 (A_boundary \u22c2 B_todo)
+  //     \u22c3 (A_boundary \u22c2 B_todo)
+  //     \u22c3 (A_boundary \u22c2 C_todo)
+  //     \u22c3 A_todo
+  // If there is no descendant of B or C also reporting a split, than
+  //   B_boundary = \u2205 and C_boundary = \u2205
+  //
+  // This restriction is processed and completed by the currently active process
+  // bundle request:
+  //   A_done \u22c3 (A_boundary \u22c2 B_done)
+  //          \u22c3 (A_boundary \u22c2 C_done)
+  // and these restrictions will be processed by future process bundle requests:
+  //   A_boundary \u22c2 B_todo (passed to SDF B directly)
+  //   A_boundary \u22c2 C_todo (passed to SDF C directly)
+  //   A_todo (passed to SDF A directly)
+  //
+  // Note that descendants splits should only be reported if it is inexpensive
+  // to compute the boundary restriction intersected with descendants splits.
+  // Also note, that the boundary restriction may represent a set of elements
+  // produced by a parent primitive transform which can not be split at each
+  // element or that there are intermediate unsplittable primitive transforms
+  // between an ancestor splittable function and a descendant splittable
+  // function which may have more than one output per element. Finally note
+  // that the descendant splits should only be reported if the split
+  // information is relatively compact.
+  repeated PrimitiveTransformSplit splits = 1;
+}
+
+/*
+ * Data Plane API
+ */
+
+// Messages used to represent logical byte streams.
+// Stable
+message Elements {
+  // Represents multiple encoded elements in nested context for a given named
+  // instruction and target.
+  message Data {
+    // (Required) A reference to an active instruction request with the given
+    // instruction id.
+    int64 instruction_reference = 1;
+
+    // (Required) A definition representing a consumer or producer of this data.
+    // If received by a harness, this represents the consumer within that
+    // harness that should consume these bytes. If sent by a harness, this
+    // represents the producer of these bytes.
+    //
+    // Note that a single element may span multiple Data messages.
+    //
+    // Note that a sending/receiving pair should share the same target
+    // identifier.
+    Target target = 2;
+
+    // (Optional) Represents a part of a logical byte stream. Elements within
+    // the logical byte stream are encoded in the nested context and
+    // concatenated together.
+    //
+    // An empty data block represents the end of stream for the given
+    // instruction and target.
+    bytes data = 3;
+  }
+
+  // (Required) A list containing parts of logical byte streams.
+  repeated Data data = 1;
+}
+
+// Stable
+service BeamFnData {
+  // Used to send data between harnesses.
+  rpc Data(
+    // A stream of data representing input.
+    stream Elements
+  ) returns (
+    // A stream of data representing output.
+    stream Elements
+  ) {}
+}
+
+/*
+ * State API
+ *
+ * This is just a high level sketch of how this could work. There is still
+ * a lot of work with respect to how the key spaces for the different types
+ * of access required (side inputs, user state, ...) and how state caching
+ * works across bundles.
+ */
+
+message StateRequest {
+  // (Required) An unique identifier provided by the SDK which represents this
+  // requests execution. The StateResponse must have the matching id.
+  int64 id = 1;
+
+  // (Required) The associated instruction id of the work that is currently
+  // being processed. This allows for the runner to associate any modifications
+  // to state to be committed with the appropriate work execution.
+  int64 instruction_reference = 2;
+
+  // At least one of the following fields should be populated.
+  // Also, no request should use a state key referred to in another state key.
+
+  // (Optional) A request to get state.
+  repeated StateGetRequest get = 3;
+
+  // (Optional) A request to append to state.
+  repeated StateAppendRequest append = 4;
+
+  // (Optional) A request to clear state.
+  repeated StateClearRequest clear = 5;
+}
+
+message StateResponse {
+  // (Required) A reference provided by the SDK which represents a requests
+  // execution. The StateResponse must have the matching id when responding
+  // to the SDK.
+  int64 id = 1;
+
+  // (Required) The associated instruction id of the work that is currently
+  // being processed.
+  int64 instruction_reference = 2;
+
+  // (Required) A key to associate with the version of this state. Allows for
+  // SDKs to share state across work items if they have the same cache key and
+  // state key.
+  bytes cache_key = 3;
+
+  // (Optional) If this is specified, then the state request has failed.
+  // A human readable string representing the reason as to why the request
+  // failed.
+  string error = 4;
+
+  // For every field populated in the StateRequest, there is a matching field in
+  // the StateResponse.
+
+  // (Optional) A response to getting state.
+  repeated StateGetResponse get = 5;
+
+  // (Optional) A response to appending to state.
+  repeated StateAppendResponse append = 6;
+
+  // (Optional) A response to clearing state.
+  repeated StateClearResponse clear = 7;
+}
+
+service BeamFnState {
+  // Used to get/append/clear state stored by the runner on behalf of the SDK.
+  rpc State(
+    // A stream of state instructions requested of the runner.
+    stream StateRequest
+  ) returns (
+    // A stream of responses to state instructions the runner was asked to be
+    // performed.
+    stream StateResponse
+  ) {}
+}
+
+
+// TODO: Resolve with the other State API.
+service SimpleBeamFnState {
+  // Gets the elements associated with the given key.
+  rpc Get(StateKey) returns (Elements.Data) {}
+  // Appends elements to a given state bag.
+  rpc Append(SimpleStateAppendRequest) returns (Empty) {}
+  // Clears a given state bag.
+  rpc Clear(StateKey) returns (Empty) {}
+}
+
+message Empty {
+}
+
+message SimpleStateAppendRequest {
+  StateKey state_key = 1;
+  repeated bytes data = 2;
+}
+
+message StateKey {
+  // (Required) Represents the namespace for the state. If this state is for a
+  // DoFn, then this reference is expected to point to the DoFn. If this state
+  // is for a side input, then this is expected to reference the ViewFn.
+  int64 function_spec_reference = 1;
+
+  // (Required) The bytes of the window which this state request is for encoded
+  // in the outer context.
+  bytes window = 2;
+
+  // (Required) The user key for which the value was encoded in the outer
+  // context.
+  bytes key = 3;
+}
+
+message StateKeyOrIterable {
+  // One of the two fields below are required to be set.
+  // If state key is set, then the State API should be invoked to fetch the
+  // values allowing one to restart the iterable. Otherwise the bytes for the
+  // entire iterable are represented and should be decoded using an iterable
+  // coder using the outer context.
+  StateKey state_key = 1;
+  repeated bytes iterable = 2;
+}
+
+// A request to get state for the given state key.
+message StateGetRequest {
+  // A state key encoded in the outer context.
+  StateKey state_key = 1;
+}
+
+// A response to get state for the given state key.
+message StateGetResponse {
+  // A state key encoded in the outer context.
+  StateKey state_key = 1;
+
+  oneof state {
+    // A description of an input port which will stream the state data.
+    RemoteGrpcPort remote_grpc_port = 1000;
+  }
+}
+
+// A request to append state for the given state key.
+message StateAppendRequest {
+  // A state key encoded in the outer context.
+  StateKey state_key  = 1;
+}
+
+// A response to append state for the given state key.
+message StateAppendResponse {
+  // A state key encoded in the outer context.
+  StateKey state_key = 1;
+
+  oneof state {
+    // A description of an output port which to stream the state data to.
+    RemoteGrpcPort remote_grpc_port = 1000;
+  }
+}
+
+// A request to clear state for the given state key.
+message StateClearRequest {
+  // A state key encoded in the outer context.
+  StateKey state_key = 1;
+}
+
+// A response to clear state for the given state key.
+message StateClearResponse {
+}
+
+/*
+ * Logging API
+ *
+ * This is very stable. There can be some changes to how we define a LogEntry,
+ * to increase/decrease the severity types, the way we format an exception/stack
+ * trace, or the log site.
+ */
+
+// A log entry
+message LogEntry {
+  // A list of log entries, enables buffering and batching of multiple
+  // log messages using the logging API.
+  message List {
+    // (Required) One or or more log messages.
+    repeated LogEntry log_entries = 1;
+  }
+
+  // The severity of the event described in a log entry, expressed as one of the
+  // severity levels listed below. For your reference, the levels are
+  // assigned the listed numeric values. The effect of using numeric values
+  // other than those listed is undefined.
+  //
+  // If you are writing log entries, you should map other severity encodings to
+  // one of these standard levels. For example, you might map all of
+  // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+  //
+  // This list is intentionally not comprehensive; the intent is to provide a
+  // common set of "good enough" severity levels so that logging front ends
+  // can provide filtering and searching across log types. Users of the API are
+  // free not to use all severity levels in their log messages.
+  enum Severity {
+    // Trace level information, also the default log level unless
+    // another severity is specified.
+    TRACE = 0;
+    // Debugging information.
+    DEBUG = 10;
+    // Normal events.
+    INFO = 20;
+    // Normal but significant events, such as start up, shut down, or
+    // configuration.
+    NOTICE = 30;
+    // Warning events might cause problems.
+    WARN = 40;
+    // Error events are likely to cause problems.
+    ERROR = 50;
+    // Critical events cause severe problems or brief outages and may
+    // indicate that a person must take action.
+    CRITICAL = 60;
+  }
+
+  // (Required) The severity of the log statement.
+  Severity severity = 1;
+
+  // (Required) The time at which this log statement occurred.
+  google.protobuf.Timestamp timestamp = 2;
+
+  // (Required) A human readable message.
+  string message = 3;
+
+  // (Optional) An optional trace of the functions involved. For example, in
+  // Java this can include multiple causes and multiple suppressed exceptions.
+  string trace = 4;
+
+  // (Optional) A reference to the instruction this log statement is associated
+  // with.
+  int64 instruction_reference = 5;
+
+  // (Optional) A reference to the primitive transform this log statement is
+  // associated with.
+  int64 primitive_transform_reference = 6;
+
+  // (Optional) Human-readable name of the function or method being invoked,
+  // with optional context such as the class or package name. The format can
+  // vary by language. For example:
+  //   qual.if.ied.Class.method (Java)
+  //   dir/package.func (Go)
+  //   module.function (Python)
+  //   file.cc:382 (C++)
+  string log_location = 7;
+
+  // (Optional) The name of the thread this log statement is associated with.
+  string thread = 8;
+}
+
+message LogControl {
+}
+
+// Stable
+service BeamFnLogging {
+  // Allows for the SDK to emit log entries which the runner can
+  // associate with the active job.
+  rpc Logging(
+    // A stream of log entries batched into lists emitted by the SDK harness.
+    stream LogEntry.List
+  ) returns (
+    // A stream of log control messages used to configure the SDK.
+    stream LogControl
+  ) {}
+}
+
+/*
+ * Environment types
+ */
+message ApiServiceDescriptor {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  int64 id = 1;
+
+  // (Required) The URL to connect to.
+  string url = 2;
+
+  // (Optional) The method for authentication. If unspecified, access to the
+  // url is already being performed in a trusted context (e.g. localhost,
+  // private network).
+  oneof authentication {
+    OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3;
+  }
+}
+
+message OAuth2ClientCredentialsGrant {
+  // (Required) The URL to submit a "client_credentials" grant type request for
+  // an OAuth access token which will be used as a bearer token for requests.
+  string url = 1;
+}
+
+// A Docker container configuration for launching the SDK Fn Harness to execute
+// user specified functions.
+message DockerContainer {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  int64 id = 1;
+
+  // (Required) The Docker container URI
+  // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
+  string uri = 2;
+
+  // (Optional) Docker registry specification.
+  // If unspecified, the uri is expected to be able to be fetched without
+  // requiring additional configuration by a runner.
+  int64 registry_reference = 3;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
new file mode 100644
index 0000000..8364d9a
--- /dev/null
+++ b/sdks/common/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-common-parent</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: SDKs :: Common</name>
+
+  <modules>
+    <module>fn-api</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 35b5ed3..91ab9be 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -34,12 +34,42 @@
           unapproved artifact license.
         -->
   <Match>
+    <Class name="org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver"/>
+    <Method name="onCompleted"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+    <!--
+    Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+    https://github.com/findbugsproject/findbugs/issues/79
+    -->
+  </Match>
+
+  <Match>
+    <Class name="org.apache.beam.fn.harness.data.BeamFnDataInboundObserver"/>
+    <Method name="accept"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+    <!--
+    Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+    https://github.com/findbugsproject/findbugs/issues/79
+    -->
+  </Match>
+
+  <Match>
+    <Class name="org.apache.beam.fn.harness.logging.BeamFnLoggingClient$LogControlObserver"/>
+    <Method name="onCompleted"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+    <!--
+    Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+    https://github.com/findbugsproject/findbugs/issues/79
+    -->
+  </Match>
+
+  <Match>
     <Class name="org.apache.beam.sdk.coders.AvroCoder$SerializableSchemaSupplier"/>
     <Field name="schema"/>
     <Bug pattern="SE_BAD_FIELD"/>
     <!--
     writeReplace makes this object serializable. This is a limitation of FindBugs as discussed here:
-     http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
+    http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
     -->
   </Match>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
new file mode 100644
index 0000000..e164ee0
--- /dev/null
+++ b/sdks/java/harness/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <packaging>jar</packaging>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-harness</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Harness</name>
+  <description>This contains the SDK Fn Harness for Beam Java</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <!--  Override Beam parent to allow Java8 -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-fn-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.errorprone</groupId>
+      <artifactId>error_prone_annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-lite</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <classifier>linux-x86_64</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
new file mode 100644
index 0000000..3e06f38
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.TextFormat;
+import java.io.PrintStream;
+import java.util.EnumMap;
+import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
+import org.apache.beam.fn.harness.control.BeamFnControlClient;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler;
+import org.apache.beam.fn.harness.control.RegisterHandler;
+import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
+import org.apache.beam.fn.harness.fn.ThrowingFunction;
+import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
+import org.apache.beam.fn.harness.stream.StreamObserverFactory;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point into the Beam SDK Fn Harness for Java.
+ *
+ * <p>This entry point expects the following environment variables:
+ * <ul>
+ *   <li>LOGGING_API_SERVICE_DESCRIPTOR: A
+ *   {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text
+ *   representing the endpoint that is to be connected to for the Beam Fn Logging service.</li>
+ *   <li>CONTROL_API_SERVICE_DESCRIPTOR: A
+ *   {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text
+ *   representing the endpoint that is to be connected to for the Beam Fn Control service.</li>
+ *   <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions}
+ *   for further details.</li>
+ * </ul>
+ */
+public class FnHarness {
+  private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
+  private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
+  private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
+  private static final Logger LOGGER = LoggerFactory.getLogger(FnHarness.class);
+
+  private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String env)
+      throws TextFormat.ParseException {
+    BeamFnApi.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+        BeamFnApi.ApiServiceDescriptor.newBuilder();
+    TextFormat.merge(System.getenv(env), apiServiceDescriptorBuilder);
+    return apiServiceDescriptorBuilder.build();
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.out.format("SDK Fn Harness started%n");
+    System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
+    System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
+    System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
+
+    PipelineOptions options = new ObjectMapper().readValue(
+        System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
+
+    BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor =
+        getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
+
+    BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor =
+        getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);
+
+    main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
+  }
+
+  public static void main(PipelineOptions options,
+      BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor,
+      BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
+    IOChannelUtils.registerIOFactories(options);
+
+    ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
+    StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
+    PrintStream originalErrStream = System.err;
+
+    try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
+        options,
+        loggingApiServiceDescriptor,
+        channelFactory::forDescriptor,
+        streamObserverFactory::from)) {
+
+      LOGGER.info("Fn Harness started");
+      EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+              ThrowingFunction<BeamFnApi.InstructionRequest,
+                               BeamFnApi.InstructionResponse.Builder>> handlers =
+          new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
+
+      RegisterHandler fnApiRegistry = new RegisterHandler();
+      BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
+          options, channelFactory::forDescriptor, streamObserverFactory::from);
+
+      ProcessBundleHandler processBundleHandler =
+          new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer);
+      handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
+          fnApiRegistry::register);
+      handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
+          processBundleHandler::processBundle);
+      BeamFnControlClient control = new BeamFnControlClient(controlApiServiceDescriptor,
+          channelFactory::forDescriptor,
+          streamObserverFactory::from,
+          handlers);
+
+      LOGGER.info("Entering instruction processing loop");
+      control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
+    } catch (Throwable t) {
+      t.printStackTrace(originalErrStream);
+    } finally {
+      originalErrStream.println("Shutting SDK harness down.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
new file mode 100644
index 0000000..d26f4a5
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel} implementation
+ * to use.
+ */
+public abstract class ManagedChannelFactory {
+  public static ManagedChannelFactory from(PipelineOptions options) {
+    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
+      io.netty.channel.epoll.Epoll.ensureAvailability();
+      return new Epoll();
+    }
+    return new Default();
+  }
+
+  public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
+
+  /**
+   * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
+   * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
+   * {@link EpollSocketChannel}.
+   */
+  private static class Epoll extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+      SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
+      return NettyChannelBuilder.forAddress(address)
+          .channelType(address instanceof DomainSocketAddress
+              ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
+          .eventLoopGroup(new EpollEventLoopGroup())
+          .usePlaintext(true)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
+   * instances.
+   */
+  private static class Default extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+      return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
+          .usePlaintext(true)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
new file mode 100644
index 0000000..a27d542
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/** Creates a {@link SocketAddress} based upon a supplied string. */
+public class SocketAddressFactory {
+  private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
+
+  /**
+   * Parse a {@link SocketAddress} from the given string.
+   */
+  public static SocketAddress createFrom(String value) {
+    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
+      // Unix Domain Socket address.
+      // Create the underlying file for the Unix Domain Socket.
+      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
+      File file = new File(filePath);
+      if (!file.isAbsolute()) {
+        throw new IllegalArgumentException("File path must be absolute: " + filePath);
+      }
+      try {
+        if (file.createNewFile()) {
+          // If this application created the file, delete it when the application exits.
+          file.deleteOnExit();
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      // Create the SocketAddress referencing the file.
+      return new DomainSocketAddress(file);
+    } else {
+      // Standard TCP/IP address.
+      HostAndPort hostAndPort = HostAndPort.fromString(value);
+      checkArgument(hostAndPort.hasPort(),
+          "Address must be a unix:// path or be in the form host:port. Got: %s", value);
+      return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
new file mode 100644
index 0000000..6323166
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC channel management.
+ */
+package org.apache.beam.fn.harness.channel;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
new file mode 100644
index 0000000..7f44a01
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.EnumMap;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.ThrowingFunction;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client for the Beam Fn Control API. Uses an unbounded internal queue to pull down
+ * an unbounded number of requests.
+ *
+ * <p>Also can delegate to a set of handlers based upon the
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest.RequestCase request type}.
+ *
+ * <p>When the inbound instruction stream finishes successfully, the {@code onFinish} is
+ * completed successfully signaling to the caller that this client will not produce any more
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s. If the inbound instruction stream
+ * errors, the {@code onFinish} is completed exceptionally propagating the failure reason
+ * to the caller and signaling that this client will not produce any more
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s.
+ */
+public class BeamFnControlClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnControlClient.class);
+  private static final BeamFnApi.InstructionRequest POISON_PILL =
+      BeamFnApi.InstructionRequest.newBuilder().setInstructionId(Long.MIN_VALUE).build();
+
+  private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
+  private final BlockingDeque<BeamFnApi.InstructionRequest> bufferedInstructions;
+  private final EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+                        ThrowingFunction<BeamFnApi.InstructionRequest,
+                                         BeamFnApi.InstructionResponse.Builder>> handlers;
+  private final CompletableFuture<Void> onFinish;
+
+  public BeamFnControlClient(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      BiFunction<Function<StreamObserver<BeamFnApi.InstructionRequest>,
+                          StreamObserver<BeamFnApi.InstructionResponse>>,
+                 StreamObserver<BeamFnApi.InstructionRequest>,
+                 StreamObserver<BeamFnApi.InstructionResponse>> streamObserverFactory,
+      EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+              ThrowingFunction<BeamFnApi.InstructionRequest,
+                               BeamFnApi.InstructionResponse.Builder>> handlers) {
+    this.bufferedInstructions = new LinkedBlockingDeque<>();
+    this.outboundObserver = streamObserverFactory.apply(
+        BeamFnControlGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::control,
+        new InboundObserver());
+    this.handlers = handlers;
+    this.onFinish = new CompletableFuture<>();
+  }
+
+  /**
+   * A {@link StreamObserver} for the inbound stream that completes the future on stream
+   * termination.
+   */
+  private class InboundObserver implements StreamObserver<BeamFnApi.InstructionRequest> {
+    @Override
+    public void onNext(BeamFnApi.InstructionRequest value) {
+      LOGGER.info("InstructionRequest received {}", value);
+      Uninterruptibles.putUninterruptibly(bufferedInstructions, value);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      placePoisonPillIntoQueue();
+      onFinish.completeExceptionally(t);
+    }
+
+    @Override
+    public void onCompleted() {
+      placePoisonPillIntoQueue();
+      onFinish.complete(null);
+    }
+
+    /**
+     * This method emulates {@link Uninterruptibles#putUninterruptibly} but placing the
+     * element at the front of the queue.
+     *
+     * <p>We place the poison pill at the front of the queue because if the server shutdown,
+     * any remaining instructions can be discarded.
+     */
+    private void placePoisonPillIntoQueue() {
+      while (true) {
+        try {
+          bufferedInstructions.putFirst(POISON_PILL);
+          return;
+        } catch (InterruptedException e) {
+          // Ignored until we place the poison pill into the queue
+        }
+      }
+    }
+  }
+
+  /**
+   * Note that this method continuously submits work to the supplied executor until the
+   * Beam Fn Control server hangs up or fails exceptionally.
+   */
+  public void processInstructionRequests(Executor executor)
+      throws InterruptedException, ExecutionException {
+    BeamFnApi.InstructionRequest request;
+    while ((request = bufferedInstructions.take()) != POISON_PILL) {
+      BeamFnApi.InstructionRequest currentRequest = request;
+      executor.execute(
+          () -> sendInstructionResponse(delegateOnInstructionRequestType(currentRequest)));
+    }
+    onFinish.get();
+  }
+
+  public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
+      BeamFnApi.InstructionRequest value) {
+    try {
+      return handlers.getOrDefault(value.getRequestCase(), this::missingHandler)
+          .apply(value)
+          .setInstructionId(value.getInstructionId())
+          .build();
+    } catch (Exception e) {
+      return BeamFnApi.InstructionResponse.newBuilder()
+          .setInstructionId(value.getInstructionId())
+          .setError(getStackTraceAsString(e))
+          .build();
+    }
+  }
+
+  public void sendInstructionResponse(BeamFnApi.InstructionResponse value) {
+    outboundObserver.onNext(value);
+  }
+
+  private BeamFnApi.InstructionResponse.Builder missingHandler(
+      BeamFnApi.InstructionRequest request) {
+    return BeamFnApi.InstructionResponse.newBuilder().setError(
+        String.format("Unknown InstructionRequest type %s", request.getRequestCase()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
new file mode 100644
index 0000000..05c2aab
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fake.FakeAggregatorFactory;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.BeamFnDataReadRunner;
+import org.apache.beam.runners.core.BeamFnDataWriteRunner;
+import org.apache.beam.runners.core.BoundedSourceRunner;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing
+ * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
+ * wiring them together based upon the {@code input} and {@code output} map definitions.
+ *
+ * <p>Finally executes the DAG based graph by starting all runners in reverse topological order,
+ * and finishing all runners in forward topological order.
+ */
+public class ProcessBundleHandler {
+  // TODO: What should the initial set of URNs be?
+  private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
+  private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
+  private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
+  private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBundleHandler.class);
+
+  private final PipelineOptions options;
+  private final Function<Long, Message> fnApiRegistry;
+  private final BeamFnDataClient beamFnDataClient;
+
+  public ProcessBundleHandler(
+      PipelineOptions options,
+      Function<Long, Message> fnApiRegistry,
+      BeamFnDataClient beamFnDataClient) {
+    this.options = options;
+    this.fnApiRegistry = fnApiRegistry;
+    this.beamFnDataClient = beamFnDataClient;
+  }
+
+  protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+      BeamFnApi.PrimitiveTransform primitiveTransform,
+      Supplier<Long> processBundleInstructionId,
+      Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+      BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+      Consumer<ThrowingRunnable> addStartFunction,
+      Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+    BeamFnApi.FunctionSpec functionSpec = primitiveTransform.getFunctionSpec();
+
+    // For every output PCollection, create a map from output name to Consumer
+    ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>>
+        outputMapBuilder = ImmutableMap.builder();
+    for (Map.Entry<String, BeamFnApi.PCollection> entry :
+        primitiveTransform.getOutputsMap().entrySet()) {
+      outputMapBuilder.put(
+          entry.getKey(),
+          consumers.apply(
+              BeamFnApi.Target.newBuilder()
+                  .setPrimitiveTransformReference(primitiveTransform.getId())
+                  .setName(entry.getKey())
+                  .build()));
+    }
+    ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap =
+        outputMapBuilder.build();
+
+    // Based upon the function spec, populate the start/finish/consumer information.
+    ThrowingConsumer<WindowedValue<InputT>> consumer;
+    switch (functionSpec.getUrn()) {
+      default:
+        BeamFnApi.Target target;
+        BeamFnApi.Coder coderSpec;
+        throw new IllegalArgumentException(
+            String.format("Unknown FunctionSpec %s", functionSpec));
+
+      case DATA_OUTPUT_URN:
+        target = BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(primitiveTransform.getId())
+            .setName(getOnlyElement(primitiveTransform.getOutputsMap().keySet()))
+            .build();
+        coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
+            getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
+        BeamFnDataWriteRunner<InputT> remoteGrpcWriteRunner =
+            new BeamFnDataWriteRunner<>(
+                functionSpec,
+                processBundleInstructionId,
+                target,
+                coderSpec,
+                beamFnDataClient);
+        addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput);
+        consumer = remoteGrpcWriteRunner::consume;
+        addFinishFunction.accept(remoteGrpcWriteRunner::close);
+        break;
+
+      case DATA_INPUT_URN:
+        target = BeamFnApi.Target.newBuilder()
+            .setPrimitiveTransformReference(primitiveTransform.getId())
+            .setName(getOnlyElement(primitiveTransform.getInputsMap().keySet()))
+            .build();
+        coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
+            getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
+        BeamFnDataReadRunner<OutputT> remoteGrpcReadRunner =
+            new BeamFnDataReadRunner<>(
+                functionSpec,
+                processBundleInstructionId,
+                target,
+                coderSpec,
+                beamFnDataClient,
+                outputMap);
+        addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation);
+        consumer = null;
+        addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes);
+        break;
+
+      case JAVA_DO_FN_URN:
+        DoFnRunner<InputT, OutputT> doFnRunner = createDoFnRunner(functionSpec, outputMap);
+        addStartFunction.accept(doFnRunner::startBundle);
+        addFinishFunction.accept(doFnRunner::finishBundle);
+        consumer = doFnRunner::processElement;
+        break;
+
+      case JAVA_SOURCE_URN:
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        BoundedSourceRunner<BoundedSource<OutputT>, OutputT> sourceRunner =
+            createBoundedSourceRunner(functionSpec, outputMap);
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        ThrowingConsumer<WindowedValue<?>> sourceConsumer =
+            (ThrowingConsumer)
+                (ThrowingConsumer<WindowedValue<BoundedSource<OutputT>>>)
+                    sourceRunner::runReadLoop;
+        // TODO: Remove and replace with source being sent across gRPC port
+        addStartFunction.accept(sourceRunner::start);
+        consumer = (ThrowingConsumer) sourceConsumer;
+        break;
+    }
+
+    if (consumer != null) {
+      for (Map.Entry<String, BeamFnApi.Target.List> entry :
+          primitiveTransform.getInputsMap().entrySet()) {
+        for (BeamFnApi.Target target : entry.getValue().getTargetList()) {
+          addConsumer.accept(target, consumer);
+        }
+      }
+    }
+  }
+
+  public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request)
+      throws Exception {
+    BeamFnApi.InstructionResponse.Builder response =
+        BeamFnApi.InstructionResponse.newBuilder()
+            .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
+
+    long bundleId = request.getProcessBundle().getProcessBundleDescriptorReference();
+    BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
+        (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
+
+    Multimap<BeamFnApi.Target,
+             ThrowingConsumer<WindowedValue<Object>>> outputTargetToConsumer =
+             HashMultimap.create();
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+    // We process the primitive transform list in reverse order
+    // because we assume that the runner provides it in topologically order.
+    // This means that all the start/finish functions will be in reverse topological order.
+    for (BeamFnApi.PrimitiveTransform primitiveTransform :
+        Lists.reverse(bundleDescriptor.getPrimitiveTransformList())) {
+      createConsumersForPrimitiveTransform(
+          primitiveTransform,
+          request::getInstructionId,
+          outputTargetToConsumer::get,
+          outputTargetToConsumer::put,
+          startFunctions::add,
+          finishFunctions::add);
+    }
+
+    // Already in reverse order so we don't need to do anything.
+    for (ThrowingRunnable startFunction : startFunctions) {
+      LOGGER.debug("Starting function {}", startFunction);
+      startFunction.run();
+    }
+
+    // Need to reverse this since we want to call finish in topological order.
+    for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
+      LOGGER.debug("Finishing function {}", finishFunction);
+      finishFunction.run();
+    }
+
+    return response;
+  }
+
+  /**
+   * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}.
+   */
+  private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(
+      BeamFnApi.FunctionSpec functionSpec,
+      Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+    ByteString serializedFn;
+    try {
+      serializedFn = functionSpec.getData().unpack(BytesValue.class).getValue();
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          String.format("Unable to unwrap DoFn %s", functionSpec), e);
+    }
+    DoFnInfo<?, ?> doFnInfo =
+        (DoFnInfo<?, ?>)
+            SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo");
+
+    checkArgument(
+        Objects.equals(
+            new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+            doFnInfo.getOutputMap().keySet()),
+        "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+        outputMap.keySet(),
+        doFnInfo.getOutputMap());
+
+    ImmutableMultimap.Builder<TupleTag<?>,
+                              ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput =
+                              ImmutableMultimap.builder();
+    for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+      tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey())));
+    }
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    final Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap =
+        (Map) tagToOutput.build().asMap();
+
+    OutputManager outputManager =
+        new OutputManager() {
+          Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput =
+              tagBasedOutputMap;
+
+          @Override
+          public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+            try {
+              Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+                  tupleTagToOutput.get(tag);
+              if (consumers == null) {
+                // TODO: Should we handle undeclared outputs, if so how?
+                throw new UnsupportedOperationException(String.format(
+                    "Unable to output %s on unknown output %s", output, tag));
+              }
+              for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) {
+                consumer.accept(output);
+              }
+            } catch (Throwable t) {
+              throw new RuntimeException(t);
+            }
+          }
+        };
+
+    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+    DoFnRunner<InputT, OutputT> runner =
+        DoFnRunners.simpleRunner(
+            PipelineOptionsFactory.create(), /* TODO */
+            (DoFn) doFnInfo.getDoFn(),
+            NullSideInputReader.empty(), /* TODO */
+            outputManager,
+            (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
+            new ArrayList<>(doFnInfo.getOutputMap().values()),
+            new FakeStepContext(),
+            new FakeAggregatorFactory(),
+            (WindowingStrategy) doFnInfo.getWindowingStrategy());
+    return runner;
+  }
+
+  private <InputT extends BoundedSource<OutputT>, OutputT>
+      BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(
+          BeamFnApi.FunctionSpec functionSpec,
+          Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    BoundedSourceRunner<InputT, OutputT> runner =
+        new BoundedSourceRunner(options, functionSpec, outputMap);
+    return runner;
+  }
+}