You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/05 20:06:17 UTC

[1/2] beam git commit: [BEAM-1347] Migrate to Runner API constructs within the Java SDK harness

Repository: beam
Updated Branches:
  refs/heads/master ad2c1f1fc -> bf2d30058


[BEAM-1347] Migrate to Runner API constructs within the Java SDK harness


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9511257a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9511257a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9511257a

Branch: refs/heads/master
Commit: 9511257ac0a59ecc56ea2f5a6646954e964d7fb4
Parents: ad2c1f1
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 5 11:01:54 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 13:04:55 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       |   5 +
 .../harness/control/ProcessBundleHandler.java   | 178 ++++++---
 .../fn/harness/control/RegisterHandler.java     |  12 +-
 .../harness/data/BeamFnDataGrpcMultiplexer.java |   8 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  12 +-
 .../runners/core/BeamFnDataWriteRunner.java     |  12 +-
 .../beam/runners/core/BoundedSourceRunner.java  |  10 +-
 .../control/ProcessBundleHandlerTest.java       | 400 ++++++++-----------
 .../fn/harness/control/RegisterHandlerTest.java |  26 +-
 .../runners/core/BeamFnDataReadRunnerTest.java  |  18 +-
 .../runners/core/BeamFnDataWriteRunnerTest.java |  20 +-
 .../runners/core/BoundedSourceRunnerTest.java   |   8 +-
 12 files changed, 371 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 3918fd9..61a170a 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -88,6 +88,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-common-fn-api</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index fd9f0df..e33277a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -38,7 +38,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -55,6 +54,7 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -96,108 +96,145 @@ public class ProcessBundleHandler {
     this.beamFnDataClient = beamFnDataClient;
   }
 
-  protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
-      BeamFnApi.PrimitiveTransform primitiveTransform,
+  private void createRunnerAndConsumersForPTransformRecursively(
+      String pTransformId,
+      RunnerApi.PTransform pTransform,
       Supplier<String> processBundleInstructionId,
-      Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
-      BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+      BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+      Multimap<String, String> pCollectionIdsToConsumingPTransforms,
+      Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+      Consumer<ThrowingRunnable> addStartFunction,
+      Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+    // Recursively ensure that all consumers of the output PCollection have been created.
+    // Since we are creating the consumers first, we know that the we are building the DAG
+    // in reverse topological order.
+    for (String pCollectionId : pTransform.getOutputsMap().values()) {
+      // If we have created the consumers for this PCollection we can skip it.
+      if (pCollectionIdsToConsumers.containsKey(pCollectionId)) {
+        continue;
+      }
+
+      for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get(pCollectionId)) {
+        createRunnerAndConsumersForPTransformRecursively(
+            consumingPTransformId,
+            processBundleDescriptor.getTransformsMap().get(consumingPTransformId),
+            processBundleInstructionId,
+            processBundleDescriptor,
+            pCollectionIdsToConsumingPTransforms,
+            pCollectionIdsToConsumers,
+            addStartFunction,
+            addFinishFunction);
+      }
+    }
+
+    createRunnerForPTransform(
+        pTransformId,
+        pTransform,
+        processBundleInstructionId,
+        processBundleDescriptor.getPcollectionsMap(),
+        pCollectionIdsToConsumers,
+        addStartFunction,
+        addFinishFunction);
+  }
+
+  protected void createRunnerForPTransform(
+      String pTransformId,
+      RunnerApi.PTransform pTransform,
+      Supplier<String> processBundleInstructionId,
+      Map<String, RunnerApi.PCollection> pCollections,
+      Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
       Consumer<ThrowingRunnable> addStartFunction,
       Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
 
-    BeamFnApi.FunctionSpec functionSpec = primitiveTransform.getFunctionSpec();
 
     // For every output PCollection, create a map from output name to Consumer
-    ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>>
+    ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>>
         outputMapBuilder = ImmutableMap.builder();
-    for (Map.Entry<String, BeamFnApi.PCollection> entry :
-        primitiveTransform.getOutputsMap().entrySet()) {
+    for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
       outputMapBuilder.put(
           entry.getKey(),
-          consumers.apply(
-              BeamFnApi.Target.newBuilder()
-                  .setPrimitiveTransformReference(primitiveTransform.getId())
-                  .setName(entry.getKey())
-                  .build()));
+          pCollectionIdsToConsumers.get(entry.getValue()));
     }
-    ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap =
+    ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap =
         outputMapBuilder.build();
 
+
     // Based upon the function spec, populate the start/finish/consumer information.
-    ThrowingConsumer<WindowedValue<InputT>> consumer;
+    RunnerApi.FunctionSpec functionSpec = pTransform.getSpec();
+    ThrowingConsumer<WindowedValue<?>> consumer;
     switch (functionSpec.getUrn()) {
       default:
         BeamFnApi.Target target;
-        BeamFnApi.Coder coderSpec;
+        RunnerApi.Coder coderSpec;
         throw new IllegalArgumentException(
             String.format("Unknown FunctionSpec %s", functionSpec));
 
       case DATA_OUTPUT_URN:
         target = BeamFnApi.Target.newBuilder()
-            .setPrimitiveTransformReference(primitiveTransform.getId())
-            .setName(getOnlyElement(primitiveTransform.getOutputsMap().keySet()))
+            .setPrimitiveTransformReference(pTransformId)
+            .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
             .build();
-        coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
-            getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
-        BeamFnDataWriteRunner<InputT> remoteGrpcWriteRunner =
-            new BeamFnDataWriteRunner<>(
+        coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+            pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+        BeamFnDataWriteRunner<Object> remoteGrpcWriteRunner =
+            new BeamFnDataWriteRunner<Object>(
                 functionSpec,
                 processBundleInstructionId,
                 target,
                 coderSpec,
                 beamFnDataClient);
         addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput);
-        consumer = remoteGrpcWriteRunner::consume;
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<Object>>) remoteGrpcWriteRunner::consume;
         addFinishFunction.accept(remoteGrpcWriteRunner::close);
         break;
 
       case DATA_INPUT_URN:
         target = BeamFnApi.Target.newBuilder()
-            .setPrimitiveTransformReference(primitiveTransform.getId())
-            .setName(getOnlyElement(primitiveTransform.getInputsMap().keySet()))
+            .setPrimitiveTransformReference(pTransformId)
+            .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
             .build();
-        coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
-            getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
-        BeamFnDataReadRunner<OutputT> remoteGrpcReadRunner =
-            new BeamFnDataReadRunner<>(
+        coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+            pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+        BeamFnDataReadRunner<?> remoteGrpcReadRunner =
+            new BeamFnDataReadRunner<Object>(
                 functionSpec,
                 processBundleInstructionId,
                 target,
                 coderSpec,
                 beamFnDataClient,
-                outputMap);
+                (Map) outputMap);
         addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation);
         consumer = null;
         addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes);
         break;
 
       case JAVA_DO_FN_URN:
-        DoFnRunner<InputT, OutputT> doFnRunner = createDoFnRunner(functionSpec, outputMap);
+        DoFnRunner<Object, Object> doFnRunner = createDoFnRunner(functionSpec, (Map) outputMap);
         addStartFunction.accept(doFnRunner::startBundle);
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<Object>>) doFnRunner::processElement;
         addFinishFunction.accept(doFnRunner::finishBundle);
-        consumer = doFnRunner::processElement;
         break;
 
       case JAVA_SOURCE_URN:
         @SuppressWarnings({"unchecked", "rawtypes"})
-        BoundedSourceRunner<BoundedSource<OutputT>, OutputT> sourceRunner =
-            createBoundedSourceRunner(functionSpec, outputMap);
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        ThrowingConsumer<WindowedValue<?>> sourceConsumer =
-            (ThrowingConsumer)
-                (ThrowingConsumer<WindowedValue<BoundedSource<OutputT>>>)
-                    sourceRunner::runReadLoop;
+        BoundedSourceRunner<BoundedSource<Object>, Object> sourceRunner =
+            createBoundedSourceRunner(functionSpec, (Map) outputMap);
         // TODO: Remove and replace with source being sent across gRPC port
         addStartFunction.accept(sourceRunner::start);
-        consumer = (ThrowingConsumer) sourceConsumer;
+        consumer = (ThrowingConsumer)
+            (ThrowingConsumer<WindowedValue<BoundedSource<Object>>>)
+                sourceRunner::runReadLoop;
         break;
     }
 
+    // If we created a consumer, add it to the map containing PCollection ids to consumers
     if (consumer != null) {
-      for (Map.Entry<String, BeamFnApi.Target.List> entry :
-          primitiveTransform.getInputsMap().entrySet()) {
-        for (BeamFnApi.Target target : entry.getValue().getTargetList()) {
-          addConsumer.accept(target, consumer);
-        }
+      for (String inputPCollectionId :
+          pTransform.getInputsMap().values()) {
+        pCollectionIdsToConsumers.put(inputPCollectionId, consumer);
       }
     }
   }
@@ -212,26 +249,43 @@ public class ProcessBundleHandler {
     BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
         (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
 
-    Multimap<BeamFnApi.Target,
-             ThrowingConsumer<WindowedValue<Object>>> outputTargetToConsumer =
-             HashMultimap.create();
+    Multimap<String, String> pCollectionIdsToConsumingPTransforms = HashMultimap.create();
+    Multimap<String,
+        ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers =
+        HashMultimap.create();
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-    // We process the primitive transform list in reverse order
-    // because we assume that the runner provides it in topologically order.
-    // This means that all the start/finish functions will be in reverse topological order.
-    for (BeamFnApi.PrimitiveTransform primitiveTransform :
-        Lists.reverse(bundleDescriptor.getPrimitiveTransformList())) {
-      createConsumersForPrimitiveTransform(
-          primitiveTransform,
+
+    // Build a multimap of PCollection ids to PTransform ids which consume said PCollections
+    for (Map.Entry<String, RunnerApi.PTransform> entry
+        : bundleDescriptor.getTransformsMap().entrySet()) {
+      for (String pCollectionId : entry.getValue().getInputsMap().values()) {
+        pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey());
+      }
+    }
+
+    //
+    for (Map.Entry<String, RunnerApi.PTransform> entry
+        : bundleDescriptor.getTransformsMap().entrySet()) {
+      // Skip anything which isn't a root
+      // TODO: Remove source as a root and have it be triggered by the Runner.
+      if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
+          && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())) {
+        continue;
+      }
+
+      createRunnerAndConsumersForPTransformRecursively(
+          entry.getKey(),
+          entry.getValue(),
           request::getInstructionId,
-          outputTargetToConsumer::get,
-          outputTargetToConsumer::put,
+          bundleDescriptor,
+          pCollectionIdsToConsumingPTransforms,
+          pCollectionIdsToConsumers,
           startFunctions::add,
           finishFunctions::add);
     }
 
-    // Already in reverse order so we don't need to do anything.
+    // Already in reverse topological order so we don't need to do anything.
     for (ThrowingRunnable startFunction : startFunctions) {
       LOG.debug("Starting function {}", startFunction);
       startFunction.run();
@@ -250,11 +304,11 @@ public class ProcessBundleHandler {
    * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}.
    */
   private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(
-      BeamFnApi.FunctionSpec functionSpec,
+      RunnerApi.FunctionSpec functionSpec,
       Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
     ByteString serializedFn;
     try {
-      serializedFn = functionSpec.getData().unpack(BytesValue.class).getValue();
+      serializedFn = functionSpec.getParameter().unpack(BytesValue.class).getValue();
     } catch (InvalidProtocolBufferException e) {
       throw new IllegalArgumentException(
           String.format("Unable to unwrap DoFn %s", functionSpec), e);
@@ -321,7 +375,7 @@ public class ProcessBundleHandler {
 
   private <InputT extends BoundedSource<OutputT>, OutputT>
       BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(
-          BeamFnApi.FunctionSpec functionSpec,
+          RunnerApi.FunctionSpec functionSpec,
           Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
 
     @SuppressWarnings({"rawtypes", "unchecked"})

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index fb06231..276a120 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -19,12 +19,14 @@
 package org.apache.beam.fn.harness.control;
 
 import com.google.protobuf.Message;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +47,7 @@ public class RegisterHandler {
 
   public <T extends Message> T getById(String id) {
     try {
+      LOG.debug("Attempting to find {}", id);
       @SuppressWarnings("unchecked")
       CompletableFuture<T> returnValue = (CompletableFuture<T>) computeIfAbsent(id);
       /*
@@ -75,11 +78,12 @@ public class RegisterHandler {
           processBundleDescriptor.getId(),
           processBundleDescriptor.getClass());
       computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
-      for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) {
+      for (Map.Entry<String, RunnerApi.Coder> entry
+          : processBundleDescriptor.getCodersyyyMap().entrySet()) {
         LOG.debug("Registering {} with type {}",
-            coder.getFunctionSpec().getId(),
-            coder.getClass());
-        computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder);
+            entry.getKey(),
+            entry.getValue().getClass());
+        computeIfAbsent(entry.getKey()).complete(entry.getValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
index 53dfe11..15e8c0d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
@@ -84,7 +84,7 @@ public class BeamFnDataGrpcMultiplexer {
       KV<String, BeamFnApi.Target> key) {
     return consumers.computeIfAbsent(
         key,
-        (KV<String, BeamFnApi.Target> providedKey) -> new CompletableFuture<>());
+        (KV<String, BeamFnApi.Target> unused) -> new CompletableFuture<>());
   }
 
   /**
@@ -102,7 +102,11 @@ public class BeamFnDataGrpcMultiplexer {
         try {
           KV<String, BeamFnApi.Target> key =
               KV.of(data.getInstructionReference(), data.getTarget());
-          futureForKey(key).get().accept(data);
+          CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = futureForKey(key);
+          if (!consumer.isDone()) {
+            LOG.debug("Received data for key {} without consumer ready.", key);
+          }
+          consumer.get().accept(data);
           if (data.getData().isEmpty()) {
             consumers.remove(key);
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index e6928d1..f0fe274 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -33,6 +33,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
@@ -61,14 +62,14 @@ public class BeamFnDataReadRunner<OutputT> {
   private CompletableFuture<Void> readFuture;
 
   public BeamFnDataReadRunner(
-      BeamFnApi.FunctionSpec functionSpec,
+      RunnerApi.FunctionSpec functionSpec,
       Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target inputTarget,
-      BeamFnApi.Coder coderSpec,
+      RunnerApi.Coder coderSpec,
       BeamFnDataClient beamFnDataClientFactory,
       Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap)
           throws IOException {
-    this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class)
+    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
         .getApiServiceDescriptor();
     this.inputTarget = inputTarget;
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
@@ -82,8 +83,9 @@ public class BeamFnDataReadRunner<OutputT> {
                 CloudObject.fromSpec(
                     OBJECT_MAPPER.readValue(
                         coderSpec
-                            .getFunctionSpec()
-                            .getData()
+                            .getSpec()
+                            .getSpec()
+                            .getParameter()
                             .unpack(BytesValue.class)
                             .getValue()
                             .newInput(),

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index a78da5d..a48df12 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -29,6 +29,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -51,13 +52,13 @@ public class BeamFnDataWriteRunner<InputT> {
   private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
 
   public BeamFnDataWriteRunner(
-      BeamFnApi.FunctionSpec functionSpec,
+      RunnerApi.FunctionSpec functionSpec,
       Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target outputTarget,
-      BeamFnApi.Coder coderSpec,
+      RunnerApi.Coder coderSpec,
       BeamFnDataClient beamFnDataClientFactory)
           throws IOException {
-    this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class)
+    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
         .getApiServiceDescriptor();
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
@@ -70,8 +71,9 @@ public class BeamFnDataWriteRunner<InputT> {
                 CloudObject.fromSpec(
                     OBJECT_MAPPER.readValue(
                         coderSpec
-                            .getFunctionSpec()
-                            .getData()
+                            .getSpec()
+                            .getSpec()
+                            .getParameter()
                             .unpack(BytesValue.class)
                             .getValue()
                             .newInput(),

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
index 9d9c433..4d530b8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,12 +39,12 @@ import org.apache.beam.sdk.util.WindowedValue;
  */
 public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
   private final PipelineOptions pipelineOptions;
-  private final BeamFnApi.FunctionSpec definition;
+  private final RunnerApi.FunctionSpec definition;
   private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
 
   public BoundedSourceRunner(
       PipelineOptions pipelineOptions,
-      BeamFnApi.FunctionSpec definition,
+      RunnerApi.FunctionSpec definition,
       Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
     this.pipelineOptions = pipelineOptions;
     this.definition = definition;
@@ -61,7 +61,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
     try {
       // The representation here is defined as the java serialized representation of the
       // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
-      byte[] bytes = definition.getData().unpack(BytesValue.class).getValue().toByteArray();
+      byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
       @SuppressWarnings("unchecked")
       InputT boundedSource =
           (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
@@ -69,7 +69,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
     } catch (InvalidProtocolBufferException e) {
       throw new IOException(
           String.format("Failed to decode %s, expected %s",
-              definition.getData().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
+              definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
           e);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index f405728..562f91f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -21,9 +21,9 @@ package org.apache.beam.fn.harness.control;
 import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow;
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -39,8 +39,6 @@ import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.protobuf.Any;
@@ -49,14 +47,11 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
@@ -68,7 +63,7 @@ import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -105,22 +100,27 @@ public class ProcessBundleHandlerTest {
           .setId("58L")
           .setUrl("TestUrl"))
       .build();
-  private static final BeamFnApi.Coder LONG_CODER_SPEC;
-  private static final BeamFnApi.Coder STRING_CODER_SPEC;
+  private static final RunnerApi.Coder LONG_CODER_SPEC;
+  private static final RunnerApi.Coder STRING_CODER_SPEC;
   static {
     try {
-      STRING_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))).build())))
+      STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
+          .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
+              .setSpec(RunnerApi.FunctionSpec.newBuilder()
+                  .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
+                      .build())))
+              .build())
           .build();
-      LONG_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(WindowedValue.getFullCoder(
-                  VarLongCoder.of(), GlobalWindow.Coder.INSTANCE))))).build())))
+      LONG_CODER_SPEC = RunnerApi.Coder.newBuilder()
+          .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
+              .setSpec(RunnerApi.FunctionSpec.newBuilder()
+                  .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+                      OBJECT_MAPPER.writeValueAsBytes(
+                          CloudObjects.asCloudObject(WindowedValue.getFullCoder(VarLongCoder.of(),
+                              GlobalWindow.Coder.INSTANCE)))))
+                      .build())))
+              .build())
           .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
@@ -146,12 +146,19 @@ public class ProcessBundleHandlerTest {
   public void testOrderOfStartAndFinishCalls() throws Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
-        .build();
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .putOutputs("2L-output", "2L-output-pc")
+                .build())
+            .putTransforms("3L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build())
+                .putInputs("3L-input", "2L-output-pc")
+                .build())
+            .putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance())
+            .build();
     Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
 
-    List<BeamFnApi.PrimitiveTransform> transformsProcessed = new ArrayList<>();
+    List<RunnerApi.PTransform> transformsProcessed = new ArrayList<>();
     List<String> orderOfOperations = new ArrayList<>();
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -159,23 +166,22 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient) {
       @Override
-      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
-          BeamFnApi.PrimitiveTransform primitiveTransform,
+      protected void createRunnerForPTransform(
+          String pTransformId,
+          RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
-          Function<BeamFnApi.Target,
-                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
-          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Map<String, RunnerApi.PCollection> pCollections,
+          Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
           Consumer<ThrowingRunnable> addStartFunction,
-          Consumer<ThrowingRunnable> addFinishFunction)
-          throws IOException {
+          Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
 
         assertThat(processBundleInstructionId.get(), equalTo("999L"));
 
-        transformsProcessed.add(primitiveTransform);
+        transformsProcessed.add(pTransform);
         addStartFunction.accept(
-            () -> orderOfOperations.add("Start" + primitiveTransform.getId()));
+            () -> orderOfOperations.add("Start" + pTransformId));
         addFinishFunction.accept(
-            () -> orderOfOperations.add("Finish" + primitiveTransform.getId()));
+            () -> orderOfOperations.add("Finish" + pTransformId));
       }
     };
     handler.processBundle(BeamFnApi.InstructionRequest.newBuilder()
@@ -184,21 +190,22 @@ public class ProcessBundleHandlerTest {
             BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
         .build());
 
-    // Processing of primitive transforms is performed in reverse order.
+    // Processing of transforms is performed in reverse order.
     assertThat(transformsProcessed, contains(
-        processBundleDescriptor.getPrimitiveTransform(1),
-        processBundleDescriptor.getPrimitiveTransform(0)));
+        processBundleDescriptor.getTransformsMap().get("3L"),
+        processBundleDescriptor.getTransformsMap().get("2L")));
     // Start should occur in reverse order while finish calls should occur in forward order
     assertThat(orderOfOperations, contains("Start3L", "Start2L", "Finish2L", "Finish3L"));
   }
 
   @Test
-  public void testCreatingPrimitiveTransformExceptionsArePropagated() throws Exception {
+  public void testCreatingPTransformExceptionsArePropagated() throws Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
-        .build();
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .build())
+            .build();
     Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -206,15 +213,14 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient) {
       @Override
-      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
-          BeamFnApi.PrimitiveTransform primitiveTransform,
+      protected void createRunnerForPTransform(
+          String pTransformId,
+          RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
-          Function<BeamFnApi.Target,
-                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
-          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Map<String, RunnerApi.PCollection> pCollections,
+          Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
           Consumer<ThrowingRunnable> addStartFunction,
-          Consumer<ThrowingRunnable> addFinishFunction)
-          throws IOException {
+          Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
         thrown.expect(IllegalStateException.class);
         thrown.expectMessage("TestException");
         throw new IllegalStateException("TestException");
@@ -223,16 +229,17 @@ public class ProcessBundleHandlerTest {
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
             BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
-        .build());
+            .build());
   }
 
   @Test
-  public void testPrimitiveTransformStartExceptionsArePropagated() throws Exception {
+  public void testPTransformStartExceptionsArePropagated() throws Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
-        .build();
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .build())
+            .build();
     Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -240,15 +247,14 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient) {
       @Override
-      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
-          BeamFnApi.PrimitiveTransform primitiveTransform,
+      protected void createRunnerForPTransform(
+          String pTransformId,
+          RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
-          Function<BeamFnApi.Target,
-                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
-          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Map<String, RunnerApi.PCollection> pCollections,
+          Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
           Consumer<ThrowingRunnable> addStartFunction,
-          Consumer<ThrowingRunnable> addFinishFunction)
-          throws IOException {
+          Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
         thrown.expect(IllegalStateException.class);
         thrown.expectMessage("TestException");
         addStartFunction.accept(this::throwException);
@@ -261,16 +267,17 @@ public class ProcessBundleHandlerTest {
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
             BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
-        .build());
+            .build());
   }
 
   @Test
-  public void testPrimitiveTransformFinishExceptionsArePropagated() throws Exception {
+  public void testPTransformFinishExceptionsArePropagated() throws Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
-        .addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
-        .build();
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .build())
+            .build();
     Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -278,15 +285,14 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient) {
       @Override
-      protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
-          BeamFnApi.PrimitiveTransform primitiveTransform,
+      protected void createRunnerForPTransform(
+          String pTransformId,
+          RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
-          Function<BeamFnApi.Target,
-                   Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
-          BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+          Map<String, RunnerApi.PCollection> pCollections,
+          Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
           Consumer<ThrowingRunnable> addStartFunction,
-          Consumer<ThrowingRunnable> addFinishFunction)
-          throws IOException {
+          Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
         thrown.expect(IllegalStateException.class);
         thrown.expectMessage("TestException");
         addFinishFunction.accept(this::throwException);
@@ -299,7 +305,7 @@ public class ProcessBundleHandlerTest {
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
             BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
-        .build());
+            .build());
   }
 
   private static class TestDoFn extends DoFn<String, String> {
@@ -332,72 +338,40 @@ public class ProcessBundleHandlerTest {
   @Test
   public void testCreatingAndProcessingDoFn() throws Exception {
     Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
-    String primitiveTransformId = "100L";
-    long mainOutputId = 101L;
-    long additionalOutputId = 102L;
+    String pTransformId = "100L";
+    String mainOutputId = "101";
+    String additionalOutputId = "102";
 
     DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
         new TestDoFn(),
         WindowingStrategy.globalDefault(),
         ImmutableList.of(),
         StringUtf8Coder.of(),
-        mainOutputId,
+        Long.parseLong(mainOutputId),
         ImmutableMap.of(
-            mainOutputId, TestDoFn.mainOutput,
-            additionalOutputId, TestDoFn.additionalOutput));
-    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId("1L")
+            Long.parseLong(mainOutputId), TestDoFn.mainOutput,
+            Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
+    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn(JAVA_DO_FN_URN)
-        .setData(Any.pack(BytesValue.newBuilder()
+        .setParameter(Any.pack(BytesValue.newBuilder()
             .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
             .build()))
         .build();
-    BeamFnApi.Target inputATarget1 = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference("1000L")
-        .setName("inputATarget1")
-        .build();
-    BeamFnApi.Target inputATarget2 = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference("1001L")
-        .setName("inputATarget1")
-        .build();
-    BeamFnApi.Target inputBTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference("1002L")
-        .setName("inputBTarget")
-        .build();
-    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
-        .setId(primitiveTransformId)
-        .setFunctionSpec(functionSpec)
-        .putInputs("inputA", BeamFnApi.Target.List.newBuilder()
-            .addTarget(inputATarget1)
-            .addTarget(inputATarget2)
-            .build())
-        .putInputs("inputB", BeamFnApi.Target.List.newBuilder()
-            .addTarget(inputBTarget)
-            .build())
-        .putOutputs(Long.toString(mainOutputId), BeamFnApi.PCollection.newBuilder()
-            .setCoderReference(STRING_CODER_SPEC_ID)
-            .build())
-        .putOutputs(Long.toString(additionalOutputId), BeamFnApi.PCollection.newBuilder()
-            .setCoderReference(STRING_CODER_SPEC_ID)
-            .build())
+    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
+        .setSpec(functionSpec)
+        .putInputs("inputA", "inputATarget")
+        .putInputs("inputB", "inputBTarget")
+        .putOutputs(mainOutputId, "mainOutputTarget")
+        .putOutputs(additionalOutputId, "additionalOutputTarget")
         .build();
 
     List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
     List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
-    BeamFnApi.Target mainOutputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(primitiveTransformId)
-        .setName(Long.toString(mainOutputId))
-        .build();
-    BeamFnApi.Target additionalOutputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(primitiveTransformId)
-        .setName(Long.toString(additionalOutputId))
-        .build();
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
-        ImmutableMultimap.of(
-            mainOutputTarget, mainOutputValues::add,
-            additionalOutputTarget, additionalOutputValues::add);
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
-        HashMultimap.create();
+    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
+    consumers.put("mainOutputTarget",
+        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) mainOutputValues::add);
+    consumers.put("additionalOutputTarget",
+        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) additionalOutputValues::add);
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
@@ -405,23 +379,24 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient);
-    handler.createConsumersForPrimitiveTransform(
-        primitiveTransform,
+    handler.createRunnerForPTransform(
+        pTransformId,
+        pTransform,
         Suppliers.ofInstance("57L")::get,
-        existingConsumers::get,
-        newConsumers::put,
+        ImmutableMap.of(),
+        consumers,
         startFunctions::add,
         finishFunctions::add);
 
     Iterables.getOnlyElement(startFunctions).run();
     mainOutputValues.clear();
 
-    assertEquals(newConsumers.keySet(),
-        ImmutableSet.of(inputATarget1, inputATarget2, inputBTarget));
+    assertThat(consumers.keySet(), containsInAnyOrder(
+        "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget"));
 
-    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("A1"));
-    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("A2"));
-    Iterables.getOnlyElement(newConsumers.get(inputATarget1)).accept(valueInGlobalWindow("B"));
+    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1"));
+    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2"));
+    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B"));
     assertThat(mainOutputValues, contains(
         valueInGlobalWindow("MainOutputA1"),
         valueInGlobalWindow("MainOutputA2"),
@@ -444,44 +419,26 @@ public class ProcessBundleHandlerTest {
   @Test
   public void testCreatingAndProcessingSource() throws Exception {
     Map<String, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, LONG_CODER_SPEC);
-    String primitiveTransformId = "100L";
-    long outputId = 101L;
-
-    BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference("1000L")
-        .setName("inputTarget")
-        .build();
-
     List<WindowedValue<String>> outputValues = new ArrayList<>();
-    BeamFnApi.Target outputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(primitiveTransformId)
-        .setName(Long.toString(outputId))
-        .build();
 
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
-        ImmutableMultimap.of(outputTarget, outputValues::add);
-    Multimap<BeamFnApi.Target,
-             ThrowingConsumer<WindowedValue<BoundedSource<Long>>>> newConsumers =
-             HashMultimap.create();
+    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
+    consumers.put("outputPC",
+        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add);
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
-    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId("1L")
+    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn(JAVA_SOURCE_URN)
-        .setData(Any.pack(BytesValue.newBuilder()
+        .setParameter(Any.pack(BytesValue.newBuilder()
             .setValue(ByteString.copyFrom(
                 SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
             .build()))
         .build();
 
-    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
-        .setId(primitiveTransformId)
-        .setFunctionSpec(functionSpec)
-        .putInputs("input",
-            BeamFnApi.Target.List.newBuilder().addTarget(inputTarget).build())
-        .putOutputs(Long.toString(outputId),
-            BeamFnApi.PCollection.newBuilder().setCoderReference(LONG_CODER_SPEC_ID).build())
+    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
+        .setSpec(functionSpec)
+        .putInputs("input", "inputPC")
+        .putOutputs("output", "outputPC")
         .build();
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -489,11 +446,12 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient);
 
-    handler.createConsumersForPrimitiveTransform(
-        primitiveTransform,
+    handler.createRunnerForPTransform(
+        "pTransformId",
+        pTransform,
         Suppliers.ofInstance("57L")::get,
-        existingConsumers::get,
-        newConsumers::put,
+        ImmutableMap.of(),
+        consumers,
         startFunctions::add,
         finishFunctions::add);
 
@@ -507,8 +465,8 @@ public class ProcessBundleHandlerTest {
     outputValues.clear();
 
     // Check that when passing a source along as an input, the source is processed.
-    assertEquals(newConsumers.keySet(), ImmutableSet.of(inputTarget));
-    Iterables.getOnlyElement(newConsumers.get(inputTarget)).accept(
+    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
+    Iterables.getOnlyElement(consumers.get("inputPC")).accept(
         valueInGlobalWindow(CountingSource.upTo(2)));
     assertThat(outputValues, contains(
         valueInGlobalWindow(0L),
@@ -520,35 +478,25 @@ public class ProcessBundleHandlerTest {
   @Test
   public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
     Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
-    String bundleId = "57L";
-    String primitiveTransformId = "100L";
-    long outputId = 101L;
+    String bundleId = "57";
+    String outputId = "101";
 
     List<WindowedValue<String>> outputValues = new ArrayList<>();
-    BeamFnApi.Target outputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(primitiveTransformId)
-        .setName(Long.toString(outputId))
-        .build();
 
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
-        ImmutableMultimap.of(outputTarget, outputValues::add);
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
-        HashMultimap.create();
+    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
+    consumers.put("outputPC",
+        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add);
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
-    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId("1L")
+    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn(DATA_INPUT_URN)
-        .setData(Any.pack(REMOTE_PORT))
+        .setParameter(Any.pack(REMOTE_PORT))
         .build();
 
-    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
-        .setId(primitiveTransformId)
-        .setFunctionSpec(functionSpec)
-        .putInputs("input", BeamFnApi.Target.List.getDefaultInstance())
-        .putOutputs(Long.toString(outputId),
-            BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build())
+    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
+        .setSpec(functionSpec)
+        .putOutputs(outputId, "outputPC")
         .build();
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -556,11 +504,13 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient);
 
-    handler.createConsumersForPrimitiveTransform(
-        primitiveTransform,
+    handler.createRunnerForPTransform(
+        "pTransformId",
+        pTransform,
         Suppliers.ofInstance(bundleId)::get,
-        existingConsumers::get,
-        newConsumers::put,
+        ImmutableMap.of("outputPC",
+            RunnerApi.PCollection.newBuilder().setCoderId(STRING_CODER_SPEC_ID).build()),
+        consumers,
         startFunctions::add,
         finishFunctions::add);
 
@@ -573,8 +523,8 @@ public class ProcessBundleHandlerTest {
     verify(beamFnDataClient).forInboundConsumer(
         eq(REMOTE_PORT.getApiServiceDescriptor()),
         eq(KV.of(bundleId, BeamFnApi.Target.newBuilder()
-            .setPrimitiveTransformReference(primitiveTransformId)
-            .setName("input")
+            .setPrimitiveTransformReference("pTransformId")
+            .setName(outputId)
             .build())),
         eq(STRING_CODER),
         consumerCaptor.capture());
@@ -583,7 +533,7 @@ public class ProcessBundleHandlerTest {
     assertThat(outputValues, contains(valueInGlobalWindow("TestValue")));
     outputValues.clear();
 
-    assertThat(newConsumers.keySet(), empty());
+    assertThat(consumers.keySet(), containsInAnyOrder("outputPC"));
 
     completionFuture.complete(null);
     Iterables.getOnlyElement(finishFunctions).run();
@@ -595,33 +545,20 @@ public class ProcessBundleHandlerTest {
   public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
     Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
     String bundleId = "57L";
-    String primitiveTransformId = "100L";
-    long outputId = 101L;
-
-    BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference("1000L")
-        .setName("inputTarget")
-        .build();
+    String inputId = "100L";
 
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
-        ImmutableMultimap.of();
-    Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
-        HashMultimap.create();
+    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
-    BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId("1L")
+    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn(DATA_OUTPUT_URN)
-        .setData(Any.pack(REMOTE_PORT))
+        .setParameter(Any.pack(REMOTE_PORT))
         .build();
 
-    BeamFnApi.PrimitiveTransform primitiveTransform = BeamFnApi.PrimitiveTransform.newBuilder()
-        .setId(primitiveTransformId)
-        .setFunctionSpec(functionSpec)
-        .putInputs("input", BeamFnApi.Target.List.newBuilder().addTarget(inputTarget).build())
-        .putOutputs(Long.toString(outputId),
-            BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build())
+    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
+        .setSpec(functionSpec)
+        .putInputs(inputId, "inputPC")
         .build();
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
@@ -629,11 +566,13 @@ public class ProcessBundleHandlerTest {
         fnApiRegistry::get,
         beamFnDataClient);
 
-    handler.createConsumersForPrimitiveTransform(
-        primitiveTransform,
+    handler.createRunnerForPTransform(
+        "ptransformId",
+        pTransform,
         Suppliers.ofInstance(bundleId)::get,
-        existingConsumers::get,
-        newConsumers::put,
+        ImmutableMap.of("inputPC",
+            RunnerApi.PCollection.newBuilder().setCoderId(STRING_CODER_SPEC_ID).build()),
+        consumers,
         startFunctions::add,
         finishFunctions::add);
 
@@ -643,16 +582,16 @@ public class ProcessBundleHandlerTest {
     AtomicBoolean wasCloseCalled = new AtomicBoolean();
     CloseableThrowingConsumer<WindowedValue<String>> outputConsumer =
         new CloseableThrowingConsumer<WindowedValue<String>>(){
-      @Override
-      public void close() throws Exception {
-        wasCloseCalled.set(true);
-      }
+          @Override
+          public void close() throws Exception {
+            wasCloseCalled.set(true);
+          }
 
-      @Override
-      public void accept(WindowedValue<String> t) throws Exception {
-        outputValues.add(t);
-      }
-    };
+          @Override
+          public void accept(WindowedValue<String> t) throws Exception {
+            outputValues.add(t);
+          }
+        };
 
     when(beamFnDataClient.forOutboundConsumer(
         any(),
@@ -662,14 +601,13 @@ public class ProcessBundleHandlerTest {
     verify(beamFnDataClient).forOutboundConsumer(
         eq(REMOTE_PORT.getApiServiceDescriptor()),
         eq(KV.of(bundleId, BeamFnApi.Target.newBuilder()
-            .setPrimitiveTransformReference(primitiveTransformId)
-            .setName(Long.toString(outputId))
+            .setPrimitiveTransformReference("ptransformId")
+            .setName(inputId)
             .build())),
         eq(STRING_CODER));
 
-    assertEquals(newConsumers.keySet(), ImmutableSet.of(inputTarget));
-    Iterables.getOnlyElement(newConsumers.get(inputTarget)).accept(
-        valueInGlobalWindow("TestValue"));
+    assertThat(consumers.keySet(), containsInAnyOrder("inputPC"));
+    Iterables.getOnlyElement(consumers.get("inputPC")).accept(valueInGlobalWindow("TestValue"));
     assertThat(outputValues, contains(valueInGlobalWindow("TestValue")));
     outputValues.clear();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index c32fcc4..b1f4410 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -41,12 +42,21 @@ public class RegisterHandlerTest {
       BeamFnApi.InstructionRequest.newBuilder()
       .setInstructionId("1L")
       .setRegister(BeamFnApi.RegisterRequest.newBuilder()
-          .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("1L")
-              .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setId("10L")).build()))
+          .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder()
+              .setId("1L")
+              .putCodersyyy("10L", RunnerApi.Coder.newBuilder()
+                  .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
+                      .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:10L").build())
+                      .build())
+                  .build())
+              .build())
           .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L")
-              .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setId("20L")).build()))
+              .putCodersyyy("20L", RunnerApi.Coder.newBuilder()
+                  .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
+                      .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:20L").build())
+                      .build())
+                  .build())
+              .build())
           .build())
       .build();
   private static final BeamFnApi.InstructionResponse REGISTER_RESPONSE =
@@ -71,9 +81,11 @@ public class RegisterHandlerTest {
         handler.getById("1L"));
     assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1),
         handler.getById("2L"));
-    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCoders(0),
+    assertEquals(
+        REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersyyyOrThrow("10L"),
         handler.getById("10L"));
-    assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCoders(0),
+    assertEquals(
+        REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersyyyOrThrow("20L"),
         handler.getById("20L"));
     assertEquals(REGISTER_RESPONSE, responseFuture.get());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index a3d4a1b..7e8ab1a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -71,16 +72,21 @@ public class BeamFnDataReadRunnerTest {
 
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
-  private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
-      .setData(Any.pack(PORT_SPEC)).build();
+  private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
+      .setParameter(Any.pack(PORT_SPEC)).build();
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
-  private static final BeamFnApi.Coder CODER_SPEC;
+  private static final RunnerApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
+      CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
+          RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
+              RunnerApi.FunctionSpec.newBuilder().setParameter(
+                  Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
+                      .build()))
+                  .build())
+              .build())
           .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 3383966..a3c874e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -59,17 +60,22 @@ public class BeamFnDataWriteRunnerTest {
 
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
-  private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
-      .setData(Any.pack(PORT_SPEC)).build();
+  private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
+      .setParameter(Any.pack(PORT_SPEC)).build();
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
-  private static final BeamFnApi.Coder CODER_SPEC;
+  private static final RunnerApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-      .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-          OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
-      .build();
+      CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
+          RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
+              RunnerApi.FunctionSpec.newBuilder().setParameter(
+                  Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
+                      .build()))
+                  .build())
+              .build())
+          .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/9511257a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
index 73860ef..d8ed121 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
@@ -33,7 +33,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -58,7 +58,7 @@ public class BoundedSourceRunnerTest {
     BoundedSourceRunner<BoundedSource<Long>, Long> runner =
         new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
-        BeamFnApi.FunctionSpec.getDefaultInstance(),
+        RunnerApi.FunctionSpec.getDefaultInstance(),
         outputMap);
 
     runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
@@ -81,7 +81,7 @@ public class BoundedSourceRunnerTest {
     BoundedSourceRunner<BoundedSource<Long>, Long> runner =
         new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
-        BeamFnApi.FunctionSpec.getDefaultInstance(),
+        RunnerApi.FunctionSpec.getDefaultInstance(),
         outputMap);
 
     runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
@@ -101,7 +101,7 @@ public class BoundedSourceRunnerTest {
     BoundedSourceRunner<BoundedSource<Long>, Long> runner =
         new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
-        BeamFnApi.FunctionSpec.newBuilder().setData(
+            RunnerApi.FunctionSpec.newBuilder().setParameter(
             Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
         outputMap);
 


[2/2] beam git commit: [BEAM-1347] Migrate to Runner API constructs within the Java SDK harness

Posted by lc...@apache.org.
[BEAM-1347] Migrate to Runner API constructs within the Java SDK harness

This closes #3297


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf2d3005
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf2d3005
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf2d3005

Branch: refs/heads/master
Commit: bf2d30058bc0624ab167b79085695fda1e45f589
Parents: ad2c1f1 9511257
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 5 13:06:02 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 13:06:02 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       |   5 +
 .../harness/control/ProcessBundleHandler.java   | 178 ++++++---
 .../fn/harness/control/RegisterHandler.java     |  12 +-
 .../harness/data/BeamFnDataGrpcMultiplexer.java |   8 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  12 +-
 .../runners/core/BeamFnDataWriteRunner.java     |  12 +-
 .../beam/runners/core/BoundedSourceRunner.java  |  10 +-
 .../control/ProcessBundleHandlerTest.java       | 400 ++++++++-----------
 .../fn/harness/control/RegisterHandlerTest.java |  26 +-
 .../runners/core/BeamFnDataReadRunnerTest.java  |  18 +-
 .../runners/core/BeamFnDataWriteRunnerTest.java |  20 +-
 .../runners/core/BoundedSourceRunnerTest.java   |   8 +-
 12 files changed, 371 insertions(+), 338 deletions(-)
----------------------------------------------------------------------