You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/11 19:25:59 UTC

[03/12] beam git commit: Move sdks/common/fn-api protos to model/fn-execution

Move sdks/common/fn-api protos to model/fn-execution


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e233af9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e233af9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e233af9a

Branch: refs/heads/master
Commit: e233af9a2ad8c45153315d1daffbc5191eb176a8
Parents: 2f3af31
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 5 14:15:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 11 09:53:37 2017 -0700

----------------------------------------------------------------------
 model/fn-execution/pom.xml                      |  32 +
 .../src/main/proto/beam_fn_api.proto            | 726 +++++++++++++++++++
 .../src/main/proto/beam_provision_api.proto     |  60 ++
 .../org/apache/beam/fn/v1/standard_coders.yaml  | 195 +++++
 pom.xml                                         |  13 -
 runners/apex/pom.xml                            |   2 +-
 runners/core-java/pom.xml                       |   2 +-
 runners/direct-java/pom.xml                     |   2 +-
 runners/flink/pom.xml                           |   2 +-
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 runners/spark/pom.xml                           |   2 +-
 sdks/common/fn-api/pom.xml                      | 114 ---
 .../fn-api/src/main/proto/beam_fn_api.proto     | 726 -------------------
 .../src/main/proto/beam_provision_api.proto     |  60 --
 .../org/apache/beam/fn/v1/standard_coders.yaml  | 195 -----
 sdks/common/pom.xml                             |   1 -
 sdks/java/core/pom.xml                          |   2 +-
 sdks/java/harness/pom.xml                       |  12 +-
 18 files changed, 1026 insertions(+), 1122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/model/fn-execution/pom.xml b/model/fn-execution/pom.xml
index 807feb6..b5b5fdf 100644
--- a/model/fn-execution/pom.xml
+++ b/model/fn-execution/pom.xml
@@ -79,4 +79,36 @@
       </plugin>
     </plugins>
   </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+  </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
new file mode 100644
index 0000000..5a01077
--- /dev/null
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Fn API and boostrapping.
+ *
+ * TODO: Usage of plural names in lists looks awkward in Java
+ * e.g. getOutputsMap, addCodersBuilder
+ *
+ * TODO: gRPC / proto field names conflict with generated code
+ * e.g. "class" in java, "output" in python
+ */
+
+syntax = "proto3";
+
+/* TODO: Consider consolidating common components in another package
+ * and lanaguage namespaces for re-use with Runner Api.
+ */
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "BeamFnApi";
+
+import "beam_runner_api.proto";
+import "endpoints.proto";
+import "google/protobuf/timestamp.proto";
+
+/*
+ * Constructs that define the pipeline shape.
+ *
+ * These are mostly unstable due to the missing pieces to be shared with
+ * the Runner Api like windowing strategy, display data, .... There are still
+ * some modelling questions related to whether a side input is modelled
+ * as another field on a PrimitiveTransform or as part of inputs and we
+ * still are missing things like the CompositeTransform.
+ */
+
+// A representation of an input or output definition on a primitive transform.
+// Stable
+message Target {
+  // A repeated list of target definitions.
+  message List {
+    repeated Target target = 1;
+  }
+
+  // (Required) The id of the PrimitiveTransform which is the target.
+  string primitive_transform_reference = 1;
+
+  // (Required) The local name of an input or output defined on the primitive
+  // transform.
+  string name = 2;
+}
+
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+message RemoteGrpcPort {
+  // (Required) An API descriptor which describes where to
+  // connect to including any authentication that is required.
+  org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 1;
+}
+
+/*
+ * Control Plane API
+ *
+ * Progress reporting and splitting still need further vetting. Also, this may change
+ * with the addition of new types of instructions/responses related to metrics.
+ */
+
+// An API that describes the work that a SDK harness is meant to do.
+// Stable
+service BeamFnControl {
+  // Instructions sent by the runner to the SDK requesting different types
+  // of work.
+  rpc Control(
+    // A stream of responses to instructions the SDK was asked to be performed.
+    stream InstructionResponse
+  ) returns (
+    // A stream of instructions requested of the SDK to be performed.
+    stream InstructionRequest
+  ) {}
+}
+
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
+// Stable
+message InstructionRequest {
+  // (Required) An unique identifier provided by the runner which represents
+  // this requests execution. The InstructionResponse MUST have the matching id.
+  string instruction_id = 1;
+
+  // (Required) A request that the SDK Harness needs to interpret.
+  oneof request {
+    RegisterRequest register = 1000;
+    ProcessBundleRequest process_bundle = 1001;
+    ProcessBundleProgressRequest process_bundle_progress = 1002;
+    ProcessBundleSplitRequest process_bundle_split = 1003;
+  }
+}
+
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+message InstructionResponse {
+  // (Required) A reference provided by the runner which represents a requests
+  // execution. The InstructionResponse MUST have the matching id when
+  // responding to the runner.
+  string instruction_id = 1;
+
+  // If this is specified, then this instruction has failed.
+  // A human readable string representing the reason as to why processing has
+  // failed.
+  string error = 2;
+
+  // If the instruction did not fail, it is required to return an equivalent
+  // response type depending on the request this matches.
+  oneof response {
+    RegisterResponse register = 1000;
+    ProcessBundleResponse process_bundle = 1001;
+    ProcessBundleProgressResponse process_bundle_progress = 1002;
+    ProcessBundleSplitResponse process_bundle_split = 1003;
+  }
+}
+
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+message RegisterRequest {
+  // (Optional) The set of descriptors used to process bundles.
+  repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
+}
+
+// Stable
+message RegisterResponse {
+}
+
+// Definitions that should be used to construct the bundle processing graph.
+message ProcessBundleDescriptor {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  string id = 1;
+
+  // (Required) A map from pipeline-scoped id to PTransform.
+  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
+
+  // (Required) A map from pipeline-scoped id to PCollection.
+  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
+
+  // (Required) A map from pipeline-scoped id to WindowingStrategy.
+  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4;
+
+  // (Required) A map from pipeline-scoped id to Coder.
+  map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
+
+  // (Required) A map from pipeline-scoped id to Environment.
+  map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
+
+  // A descriptor describing the end point to use for State API
+  // calls. Required if the Runner intends to send remote references over the
+  // data plane or if any of the transforms rely on user state or side inputs.
+  org.apache.beam.portability.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
+}
+
+// A request to process a given bundle.
+// Stable
+message ProcessBundleRequest {
+  // (Required) A reference to the process bundle descriptor that must be
+  // instantiated and executed by the SDK harness.
+  string process_bundle_descriptor_reference = 1;
+
+  // (Optional) A list of cache tokens that can be used by an SDK to reuse
+  // cached data returned by the State API across multiple bundles.
+  repeated bytes cache_tokens = 2;
+}
+
+// Stable
+message ProcessBundleResponse {
+  // (Optional) If metrics reporting is supported by the SDK, this represents
+  // the final metrics to record for this bundle.
+  Metrics metrics = 1;
+}
+
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
+message ProcessBundleProgressRequest {
+  // (Required) A reference to an active process bundle request with the given
+  // instruction id.
+  string instruction_reference = 1;
+}
+
+message Metrics {
+  // PTransform level metrics.
+  // These metrics are split into processed and active element groups for
+  // progress reporting purposes. This allows a Runner to see what is measured,
+  // what is estimated and what can be extrapolated to be able to accurately
+  // estimate the backlog of remaining work.
+  message PTransform {
+    // Metrics that are measured for processed and active element groups.
+    message Measured {
+      // (Required) Map from local input name to number of elements processed
+      // from this input.
+      map<string, int64> input_element_counts = 1;
+
+      // (Required) Map from local output name to number of elements produced
+      // for this output.
+      map<string, int64> output_element_counts = 2;
+
+      // (Optional) The total time spent so far in processing the elements in
+      // this group.
+      int64 total_time_spent = 3;
+
+      // TODO: Add other element group level metrics.
+    }
+
+    // Metrics for fully processed elements.
+    message ProcessedElements {
+      // (Required)
+      Measured measured = 1;
+    }
+
+    // Metrics for active elements.
+    // An element is considered active if the SDK has started but not finished
+    // processing it yet.
+    message ActiveElements {
+      // (Required)
+      Measured measured = 1;
+
+      // Estimated metrics.
+
+      // (Optional) Sum of estimated fraction of known work remaining for all
+      // active elements, as reported by this transform.
+      // If not reported, a Runner could extrapolate this from the processed
+      // elements.
+      // TODO: Handle the case when known work is infinite.
+      double fraction_remaining = 2;
+
+      // (Optional) Map from local output name to sum of estimated number
+      // of elements remaining for this output from all active elements,
+      // as reported by this transform.
+      // If not reported, a Runner could extrapolate this from the processed
+      // elements.
+      map<string, int64> output_elements_remaining = 3;
+    }
+
+    // (Required): Metrics for processed elements.
+    ProcessedElements processed_elements = 1;
+    // (Required): Metrics for active elements.
+    ActiveElements active_elements = 2;
+
+    // (Optional): Map from local output name to its watermark.
+    // The watermarks reported are tentative, to get a better sense of progress
+    // while processing a bundle but before it is committed. At bundle commit
+    // time, a Runner needs to also take into account the timers set to compute
+    // the actual watermarks.
+    map<string, int64> watermarks = 3;
+
+    // TODO: Define other transform level system metrics.
+  }
+
+  // User defined metrics
+  message User {
+    // TODO: Define it.
+  }
+
+  map<string, PTransform> ptransforms = 1;
+  map<string, User> user = 2;
+}
+
+message ProcessBundleProgressResponse {
+  // (Required)
+  Metrics metrics = 1;
+}
+
+message ProcessBundleSplitRequest {
+  // (Required) A reference to an active process bundle request with the given
+  // instruction id.
+  string instruction_reference = 1;
+
+  // (Required) The fraction of work (when compared to the known amount of work)
+  // the process bundle request should try to split at.
+  double fraction = 2;
+}
+
+// urn:org.apache.beam:restriction:element-count:1.0
+message ElementCountRestriction {
+  // A restriction representing the number of elements that should be processed.
+  // Effectively the range [0, count]
+  int64 count = 1;
+}
+
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+message ElementCountSkipRestriction {
+  // A restriction representing the number of elements that should be skipped.
+  // Effectively the range (count, infinity]
+  int64 count = 1;
+}
+
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+//   * a restriction (R_done) representing all elements that will be fully
+//     processed
+//   * a restriction (R_todo) representing all elements that will not be fully
+//     processed
+//
+// where:
+//   R_initial = R_done ⋃ R_todo
+message PrimitiveTransformSplit {
+  // (Required) A reference to a primitive transform with the given id that
+  // is part of the active process bundle request with the given instruction
+  // id.
+  string primitive_transform_reference = 1;
+
+  // (Required) A function specification describing the restriction
+  // that has been completed by the primitive transform.
+  //
+  // For example, a remote GRPC source will have a specific urn and data
+  // block containing an ElementCountRestriction.
+  org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
+
+  // (Required) A function specification describing the restriction
+  // representing the remainder of work for the primitive transform.
+  //
+  // FOr example, a remote GRPC source will have a specific urn and data
+  // block contain an ElemntCountSkipRestriction.
+  org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
+}
+
+message ProcessBundleSplitResponse {
+  // (Optional) A set of split responses for a currently active work item.
+  //
+  // If primitive transform B is a descendant of primitive transform A and both
+  // A and B report a split. Then B's restriction is reported as an element
+  // restriction pair and thus the fully reported restriction is:
+  //   R = A_done
+  //     ⋃ (A_boundary ⋂ B_done)
+  //     ⋃ (A_boundary ⋂ B_todo)
+  //     ⋃ A_todo
+  // If there is a decendant of B named C, then C would similarly report a
+  // set of element pair restrictions.
+  //
+  // This restriction is processed and completed by the currently active process
+  // bundle request:
+  //   A_done ⋃ (A_boundary ⋂ B_done)
+  // and these restrictions will be processed by future process bundle requests:
+  //   A_boundary ⋂ B_todo (passed to SDF B directly)
+  //   A_todo (passed to SDF A directly)
+
+  // If primitive transform B and C are siblings and descendants of A and A, B,
+  // and C report a split. Then B and C's restrictions are relative to A's.
+  //   R = A_done
+  //     ⋃ (A_boundary ⋂ B_done)
+  //     ⋃ (A_boundary ⋂ B_todo)
+  //     ⋃ (A_boundary ⋂ B_todo)
+  //     ⋃ (A_boundary ⋂ C_todo)
+  //     ⋃ A_todo
+  // If there is no descendant of B or C also reporting a split, than
+  //   B_boundary = ∅ and C_boundary = ∅
+  //
+  // This restriction is processed and completed by the currently active process
+  // bundle request:
+  //   A_done ⋃ (A_boundary ⋂ B_done)
+  //          ⋃ (A_boundary ⋂ C_done)
+  // and these restrictions will be processed by future process bundle requests:
+  //   A_boundary ⋂ B_todo (passed to SDF B directly)
+  //   A_boundary ⋂ C_todo (passed to SDF C directly)
+  //   A_todo (passed to SDF A directly)
+  //
+  // Note that descendants splits should only be reported if it is inexpensive
+  // to compute the boundary restriction intersected with descendants splits.
+  // Also note, that the boundary restriction may represent a set of elements
+  // produced by a parent primitive transform which can not be split at each
+  // element or that there are intermediate unsplittable primitive transforms
+  // between an ancestor splittable function and a descendant splittable
+  // function which may have more than one output per element. Finally note
+  // that the descendant splits should only be reported if the split
+  // information is relatively compact.
+  repeated PrimitiveTransformSplit splits = 1;
+}
+
+/*
+ * Data Plane API
+ */
+
+// Messages used to represent logical byte streams.
+// Stable
+message Elements {
+  // Represents multiple encoded elements in nested context for a given named
+  // instruction and target.
+  message Data {
+    // (Required) A reference to an active instruction request with the given
+    // instruction id.
+    string instruction_reference = 1;
+
+    // (Required) A definition representing a consumer or producer of this data.
+    // If received by a harness, this represents the consumer within that
+    // harness that should consume these bytes. If sent by a harness, this
+    // represents the producer of these bytes.
+    //
+    // Note that a single element may span multiple Data messages.
+    //
+    // Note that a sending/receiving pair should share the same target
+    // identifier.
+    Target target = 2;
+
+    // (Optional) Represents a part of a logical byte stream. Elements within
+    // the logical byte stream are encoded in the nested context and
+    // concatenated together.
+    //
+    // An empty data block represents the end of stream for the given
+    // instruction and target.
+    bytes data = 3;
+  }
+
+  // (Required) A list containing parts of logical byte streams.
+  repeated Data data = 1;
+}
+
+// Stable
+service BeamFnData {
+  // Used to send data between harnesses.
+  rpc Data(
+    // A stream of data representing input.
+    stream Elements
+  ) returns (
+    // A stream of data representing output.
+    stream Elements
+  ) {}
+}
+
+/*
+ * State API
+ */
+
+message StateRequest {
+  // (Required) An unique identifier provided by the SDK which represents this
+  // requests execution. The StateResponse corresponding with this request
+  // will have the matching id.
+  string id = 1;
+
+  // (Required) The associated instruction id of the work that is currently
+  // being processed. This allows for the runner to associate any modifications
+  // to state to be committed with the appropriate work execution.
+  string instruction_reference = 2;
+
+  // (Required) The state key this request is for.
+  StateKey state_key = 3;
+
+  // (Required) The action to take on this request.
+  oneof request {
+    // A request to get state.
+    StateGetRequest get = 1000;
+
+    // A request to append to state.
+    StateAppendRequest append = 1001;
+
+    // A request to clear state.
+    StateClearRequest clear = 1002;
+  }
+}
+
+message StateResponse {
+  // (Required) A reference provided by the SDK which represents a requests
+  // execution. The StateResponse must have the matching id when responding
+  // to the SDK.
+  string id = 1;
+
+  // (Optional) If this is specified, then the state request has failed.
+  // A human readable string representing the reason as to why the request
+  // failed.
+  string error = 2;
+
+  // (Optional) If this is specified, then the result of this state request
+  // can be cached using the supplied token.
+  bytes cache_token = 3;
+
+  // A corresponding response matching the request will be populated.
+  oneof response {
+    // A response to getting state.
+    StateGetResponse get = 1000;
+
+    // A response to appending to state.
+    StateAppendResponse append = 1001;
+
+    // A response to clearing state.
+    StateClearResponse clear = 1002;
+  }
+}
+
+service BeamFnState {
+  // Used to get/append/clear state stored by the runner on behalf of the SDK.
+  rpc State(
+    // A stream of state instructions requested of the runner.
+    stream StateRequest
+  ) returns (
+    // A stream of responses to state instructions the runner was asked to be
+    // performed.
+    stream StateResponse
+  ) {}
+}
+
+message StateKey {
+  message Runner {
+    // (Required) Opaque information supplied by the runner. Used to support
+    // remote references.
+    bytes key = 1;
+  }
+
+  message MultimapSideInput {
+    // (Required) The id of the PTransform containing a side input.
+    string ptransform_id = 1;
+    // (Required) The id of the side input.
+    string side_input_id = 2;
+    // (Required) The window (after mapping the currently executing elements
+    // window into the side input windows domain) encoded in a nested context.
+    bytes window = 3;
+    // (Required) The key encoded in a nested context.
+    bytes key = 4;
+  }
+
+  message BagUserState {
+    // (Required) The id of the PTransform containing user state.
+    string ptransform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context. 
+    bytes window = 3;
+    // (Required) The key of the currently executing element encoded in a
+    // nested context.
+    bytes key = 4;
+  }
+
+  // (Required) One of the following state keys must be set.
+  oneof type {
+    Runner runner = 1;
+    MultimapSideInput multimap_side_input = 2;
+    BagUserState bag_user_state = 3;
+    // TODO: represent a state key for user map state
+  }
+}
+
+// A request to get state.
+message StateGetRequest {
+  // (Optional) If specified, signals to the runner that the response
+  // should resume from the following continuation token.
+  //
+  // If unspecified, signals to the runner that the response should start
+  // from the beginning of the logical continuable stream.
+  bytes continuation_token = 1;
+}
+
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
+message StateGetResponse {
+  // (Optional) If specified, represents a token which can be used with the
+  // state API to get the next chunk of this logical byte stream. The end of
+  // the logical byte stream is signalled by this field being unset.
+  bytes continuation_token = 1;
+
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // concatenated together.
+  bytes data = 2;
+}
+
+// A request to append state.
+message StateAppendRequest {
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // multiple append requests are concatenated together.
+  bytes data = 1;
+}
+
+// A response to append state.
+message StateAppendResponse {
+}
+
+// A request to clear state.
+message StateClearRequest {
+}
+
+// A response to clear state.
+message StateClearResponse {
+}
+
+/*
+ * Logging API
+ *
+ * This is very stable. There can be some changes to how we define a LogEntry,
+ * to increase/decrease the severity types, the way we format an exception/stack
+ * trace, or the log site.
+ */
+
+// A log entry
+message LogEntry {
+  // A list of log entries, enables buffering and batching of multiple
+  // log messages using the logging API.
+  message List {
+    // (Required) One or or more log messages.
+    repeated LogEntry log_entries = 1;
+  }
+
+  // The severity of the event described in a log entry, expressed as one of the
+  // severity levels listed below. For your reference, the levels are
+  // assigned the listed numeric values. The effect of using numeric values
+  // other than those listed is undefined.
+  //
+  // If you are writing log entries, you should map other severity encodings to
+  // one of these standard levels. For example, you might map all of
+  // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+  //
+  // This list is intentionally not comprehensive; the intent is to provide a
+  // common set of "good enough" severity levels so that logging front ends
+  // can provide filtering and searching across log types. Users of the API are
+  // free not to use all severity levels in their log messages.
+  message Severity {
+    enum Enum {
+      UNSPECIFIED = 0;
+      // Trace level information, also the default log level unless
+      // another severity is specified.
+      TRACE = 1;
+      // Debugging information.
+      DEBUG = 2;
+      // Normal events.
+      INFO = 3;
+      // Normal but significant events, such as start up, shut down, or
+      // configuration.
+      NOTICE = 4;
+      // Warning events might cause problems.
+      WARN = 5;
+      // Error events are likely to cause problems.
+      ERROR = 6;
+      // Critical events cause severe problems or brief outages and may
+      // indicate that a person must take action.
+      CRITICAL = 7;
+    }
+  }
+
+  // (Required) The severity of the log statement.
+  Severity.Enum severity = 1;
+
+  // (Required) The time at which this log statement occurred.
+  google.protobuf.Timestamp timestamp = 2;
+
+  // (Required) A human readable message.
+  string message = 3;
+
+  // (Optional) An optional trace of the functions involved. For example, in
+  // Java this can include multiple causes and multiple suppressed exceptions.
+  string trace = 4;
+
+  // (Optional) A reference to the instruction this log statement is associated
+  // with.
+  string instruction_reference = 5;
+
+  // (Optional) A reference to the primitive transform this log statement is
+  // associated with.
+  string primitive_transform_reference = 6;
+
+  // (Optional) Human-readable name of the function or method being invoked,
+  // with optional context such as the class or package name. The format can
+  // vary by language. For example:
+  //   qual.if.ied.Class.method (Java)
+  //   dir/package.func (Go)
+  //   module.function (Python)
+  //   file.cc:382 (C++)
+  string log_location = 7;
+
+  // (Optional) The name of the thread this log statement is associated with.
+  string thread = 8;
+}
+
+message LogControl {
+}
+
+// Stable
+service BeamFnLogging {
+  // Allows for the SDK to emit log entries which the runner can
+  // associate with the active job.
+  rpc Logging(
+    // A stream of log entries batched into lists emitted by the SDK harness.
+    stream LogEntry.List
+  ) returns (
+    // A stream of log control messages used to configure the SDK.
+    stream LogControl
+  ) {}
+}
+
+/*
+ * Environment types
+ */
+// A Docker container configuration for launching the SDK harness to execute
+// user specified functions.
+message DockerContainer {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  string id = 1;
+
+  // (Required) The Docker container URI
+  // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
+  string uri = 2;
+
+  // (Optional) Docker registry specification.
+  // If unspecified, the uri is expected to be able to be fetched without
+  // requiring additional configuration by a runner.
+  string registry_reference = 3;
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_provision_api.proto b/model/fn-execution/src/main/proto/beam_provision_api.proto
new file mode 100644
index 0000000..b0cd6b4
--- /dev/null
+++ b/model/fn-execution/src/main/proto/beam_provision_api.proto
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Provision API, for communicating with a runner
+ * for job and environment provisioning information over GRPC.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "ProvisionApi";
+
+import "google/protobuf/struct.proto";
+
+// A service to provide runtime provisioning information to the SDK harness
+// worker instances -- such as pipeline options, resource constraints and
+// other job metadata -- needed by an SDK harness instance to initialize.
+service ProvisionService {
+    // Get provision information for the SDK harness worker instance.
+    rpc GetProvisionInfo(GetProvisionInfoRequest) returns (GetProvisionInfoResponse);
+}
+
+// A request to get the provision info of a SDK harness worker instance.
+message GetProvisionInfoRequest { }
+
+// A response containing the provision info of a SDK harness worker instance.
+message GetProvisionInfoResponse {
+    ProvisionInfo info = 1;
+}
+
+// Runtime provisioning information for a SDK harness worker instance,
+// such as pipeline options, resource constraints and other job metadata
+message ProvisionInfo {
+    // (required) The job ID.
+    string job_id = 1;
+    // (required) The job name.
+    string job_name = 2;
+
+    // (required) Pipeline options. For non-template jobs, the options are
+    // identical to what is passed to job submission.
+    google.protobuf.Struct pipeline_options = 3;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
new file mode 100644
index 0000000..f37b2d3
--- /dev/null
+++ b/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is broken into multiple sections delimited by ---. Each section specifies a set of
+# reference encodings for a single standardized coder used in a specific context.
+#
+# Each section contains up to 3 properties:
+#
+#   coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
+#   nested: a boolean meaning whether the coder was used in the nested context. Missing means to
+#           test both contexts, a shorthand for when the coder is invariant across context.
+#   examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
+#             The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
+#             one of a few standard JSON types such as numbers, strings, dicts that map naturally
+#             to the type encoded by the coder.
+#
+# These choices were made to strike a balance between portability, ease of use, and simple
+# legibility of this file itself.
+#
+# It is expected that future work will move the `coder` field into a format that it would be
+# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
+
+
+coder:
+  urn: "urn:beam:coders:bytes:0.1"
+nested: false
+examples:
+  "abc": abc
+  "ab\0c": "ab\0c"
+
+---
+
+coder:
+  urn: "urn:beam:coders:bytes:0.1"
+nested: true
+examples:
+  "\u0003abc": abc
+  "\u0004ab\0c": "ab\0c"
+  "\u00c8\u0001       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|":
+              "       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|"
+
+---
+
+coder:
+  urn: "urn:beam:coders:varint:0.1"
+examples:
+  "\0": 0
+  "\u0001": 1
+  "\u000A": 10
+  "\u00c8\u0001": 200
+  "\u00e8\u0007": 1000
+  "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:varint:0.1"}]
+examples:
+  "\u0003abc\0": {key: abc, value: 0}
+  "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:bytes:0.1"}]
+nested: false
+examples:
+  "\u0003abcdef": {key: abc, value: def}
+  "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:bytes:0.1"}]
+nested: true
+examples:
+  "\u0003abc\u0003def": {key: abc, value: def}
+  "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:interval_window:0.1"
+examples:
+  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
+  "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
+  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
+  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"}]
+examples:
+  "\0\0\0\u0001\0": [0]
+  "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
+  "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+examples:
+  "\0\0\0\u0001\u0003abc": ["abc"]
+  "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
+  "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+  # This is for iterables of unknown length, where the encoding is not
+  # deterministic.
+  non_deterministic: True
+examples:
+  "\u00ff\u00ff\u00ff\u00ff\u0000": []
+  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\0\0\0\u0001": [""]
+
+---
+
+coder:
+  urn: "urn:beam:coders:global_window:0.1"
+examples:
+  "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is added
+# in the Python SDK (BEAM-1522).
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+    value: 2,
+    timestamp: 1454293425000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: ["global"]
+  }
+
+---
+
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
+    value: 4,
+    timestamp: -400000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 280000}]
+  }
+
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
+    value: 2,
+    timestamp: -100,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+  }

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 212d703..efedb1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,19 +428,6 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-common-fn-api</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>beam-sdks-common-fn-api</artifactId>
-        <version>${project.version}</version>
-        <type>test-jar</type>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-core</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8ade583..f70e67e 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -179,7 +179,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 668c08c..087e24d 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -69,7 +69,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 4e28438..6e356fc 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -327,7 +327,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index effbdb4..e77dbc8 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -371,7 +371,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 873bb2e..da0d237 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -466,7 +466,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 3e9095a..0ba6125 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,7 +333,7 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
deleted file mode 100644
index 498f83a..0000000
--- a/sdks/common/fn-api/pom.xml
+++ /dev/null
@@ -1,114 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <packaging>jar</packaging>
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-sdks-common-parent</artifactId>
-    <version>2.3.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-sdks-common-fn-api</artifactId>
-  <name>Apache Beam :: SDKs :: Common :: Fn API</name>
-  <description>This artifact generates the stub bindings.</description>
-
-  <build>
-    <resources>
-      <resource>
-        <directory>src/test/resources</directory>
-        <filtering>true</filtering>
-      </resource>
-      <resource>
-        <directory>${project.build.directory}/original_sources_to_package</directory>
-      </resource>
-    </resources>
-
-    <plugins>
-      <!-- Skip the checkstyle plugin on generated code -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-
-      <!-- Skip the findbugs plugin on generated code -->
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.xolstice.maven.plugins</groupId>
-        <artifactId>protobuf-maven-plugin</artifactId>
-        <configuration>
-          <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
-          <pluginId>grpc-java</pluginId>
-          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-              <goal>compile-custom</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-model-pipeline</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-protobuf</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-stub</artifactId>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
deleted file mode 100644
index 5a01077..0000000
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ /dev/null
@@ -1,726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Fn API and boostrapping.
- *
- * TODO: Usage of plural names in lists looks awkward in Java
- * e.g. getOutputsMap, addCodersBuilder
- *
- * TODO: gRPC / proto field names conflict with generated code
- * e.g. "class" in java, "output" in python
- */
-
-syntax = "proto3";
-
-/* TODO: Consider consolidating common components in another package
- * and lanaguage namespaces for re-use with Runner Api.
- */
-
-package org.apache.beam.fn.v1;
-
-option java_package = "org.apache.beam.fn.v1";
-option java_outer_classname = "BeamFnApi";
-
-import "beam_runner_api.proto";
-import "endpoints.proto";
-import "google/protobuf/timestamp.proto";
-
-/*
- * Constructs that define the pipeline shape.
- *
- * These are mostly unstable due to the missing pieces to be shared with
- * the Runner Api like windowing strategy, display data, .... There are still
- * some modelling questions related to whether a side input is modelled
- * as another field on a PrimitiveTransform or as part of inputs and we
- * still are missing things like the CompositeTransform.
- */
-
-// A representation of an input or output definition on a primitive transform.
-// Stable
-message Target {
-  // A repeated list of target definitions.
-  message List {
-    repeated Target target = 1;
-  }
-
-  // (Required) The id of the PrimitiveTransform which is the target.
-  string primitive_transform_reference = 1;
-
-  // (Required) The local name of an input or output defined on the primitive
-  // transform.
-  string name = 2;
-}
-
-// A descriptor for connecting to a remote port using the Beam Fn Data API.
-// Allows for communication between two environments (for example between the
-// runner and the SDK).
-// Stable
-message RemoteGrpcPort {
-  // (Required) An API descriptor which describes where to
-  // connect to including any authentication that is required.
-  org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 1;
-}
-
-/*
- * Control Plane API
- *
- * Progress reporting and splitting still need further vetting. Also, this may change
- * with the addition of new types of instructions/responses related to metrics.
- */
-
-// An API that describes the work that a SDK harness is meant to do.
-// Stable
-service BeamFnControl {
-  // Instructions sent by the runner to the SDK requesting different types
-  // of work.
-  rpc Control(
-    // A stream of responses to instructions the SDK was asked to be performed.
-    stream InstructionResponse
-  ) returns (
-    // A stream of instructions requested of the SDK to be performed.
-    stream InstructionRequest
-  ) {}
-}
-
-// A request sent by a runner which the SDK is asked to fulfill.
-// For any unsupported request type, an error should be returned with a
-// matching instruction id.
-// Stable
-message InstructionRequest {
-  // (Required) An unique identifier provided by the runner which represents
-  // this requests execution. The InstructionResponse MUST have the matching id.
-  string instruction_id = 1;
-
-  // (Required) A request that the SDK Harness needs to interpret.
-  oneof request {
-    RegisterRequest register = 1000;
-    ProcessBundleRequest process_bundle = 1001;
-    ProcessBundleProgressRequest process_bundle_progress = 1002;
-    ProcessBundleSplitRequest process_bundle_split = 1003;
-  }
-}
-
-// The response for an associated request the SDK had been asked to fulfill.
-// Stable
-message InstructionResponse {
-  // (Required) A reference provided by the runner which represents a requests
-  // execution. The InstructionResponse MUST have the matching id when
-  // responding to the runner.
-  string instruction_id = 1;
-
-  // If this is specified, then this instruction has failed.
-  // A human readable string representing the reason as to why processing has
-  // failed.
-  string error = 2;
-
-  // If the instruction did not fail, it is required to return an equivalent
-  // response type depending on the request this matches.
-  oneof response {
-    RegisterResponse register = 1000;
-    ProcessBundleResponse process_bundle = 1001;
-    ProcessBundleProgressResponse process_bundle_progress = 1002;
-    ProcessBundleSplitResponse process_bundle_split = 1003;
-  }
-}
-
-// A list of objects which can be referred to by the runner in
-// future requests.
-// Stable
-message RegisterRequest {
-  // (Optional) The set of descriptors used to process bundles.
-  repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
-}
-
-// Stable
-message RegisterResponse {
-}
-
-// Definitions that should be used to construct the bundle processing graph.
-message ProcessBundleDescriptor {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1;
-
-  // (Required) A map from pipeline-scoped id to PTransform.
-  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
-
-  // (Required) A map from pipeline-scoped id to PCollection.
-  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
-
-  // (Required) A map from pipeline-scoped id to WindowingStrategy.
-  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4;
-
-  // (Required) A map from pipeline-scoped id to Coder.
-  map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
-
-  // (Required) A map from pipeline-scoped id to Environment.
-  map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
-
-  // A descriptor describing the end point to use for State API
-  // calls. Required if the Runner intends to send remote references over the
-  // data plane or if any of the transforms rely on user state or side inputs.
-  org.apache.beam.portability.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
-}
-
-// A request to process a given bundle.
-// Stable
-message ProcessBundleRequest {
-  // (Required) A reference to the process bundle descriptor that must be
-  // instantiated and executed by the SDK harness.
-  string process_bundle_descriptor_reference = 1;
-
-  // (Optional) A list of cache tokens that can be used by an SDK to reuse
-  // cached data returned by the State API across multiple bundles.
-  repeated bytes cache_tokens = 2;
-}
-
-// Stable
-message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  Metrics metrics = 1;
-}
-
-// A request to report progress information for a given bundle.
-// This is an optional request to be handled and is used to support advanced
-// SDK features such as SplittableDoFn, user level metrics etc.
-message ProcessBundleProgressRequest {
-  // (Required) A reference to an active process bundle request with the given
-  // instruction id.
-  string instruction_reference = 1;
-}
-
-message Metrics {
-  // PTransform level metrics.
-  // These metrics are split into processed and active element groups for
-  // progress reporting purposes. This allows a Runner to see what is measured,
-  // what is estimated and what can be extrapolated to be able to accurately
-  // estimate the backlog of remaining work.
-  message PTransform {
-    // Metrics that are measured for processed and active element groups.
-    message Measured {
-      // (Required) Map from local input name to number of elements processed
-      // from this input.
-      map<string, int64> input_element_counts = 1;
-
-      // (Required) Map from local output name to number of elements produced
-      // for this output.
-      map<string, int64> output_element_counts = 2;
-
-      // (Optional) The total time spent so far in processing the elements in
-      // this group.
-      int64 total_time_spent = 3;
-
-      // TODO: Add other element group level metrics.
-    }
-
-    // Metrics for fully processed elements.
-    message ProcessedElements {
-      // (Required)
-      Measured measured = 1;
-    }
-
-    // Metrics for active elements.
-    // An element is considered active if the SDK has started but not finished
-    // processing it yet.
-    message ActiveElements {
-      // (Required)
-      Measured measured = 1;
-
-      // Estimated metrics.
-
-      // (Optional) Sum of estimated fraction of known work remaining for all
-      // active elements, as reported by this transform.
-      // If not reported, a Runner could extrapolate this from the processed
-      // elements.
-      // TODO: Handle the case when known work is infinite.
-      double fraction_remaining = 2;
-
-      // (Optional) Map from local output name to sum of estimated number
-      // of elements remaining for this output from all active elements,
-      // as reported by this transform.
-      // If not reported, a Runner could extrapolate this from the processed
-      // elements.
-      map<string, int64> output_elements_remaining = 3;
-    }
-
-    // (Required): Metrics for processed elements.
-    ProcessedElements processed_elements = 1;
-    // (Required): Metrics for active elements.
-    ActiveElements active_elements = 2;
-
-    // (Optional): Map from local output name to its watermark.
-    // The watermarks reported are tentative, to get a better sense of progress
-    // while processing a bundle but before it is committed. At bundle commit
-    // time, a Runner needs to also take into account the timers set to compute
-    // the actual watermarks.
-    map<string, int64> watermarks = 3;
-
-    // TODO: Define other transform level system metrics.
-  }
-
-  // User defined metrics
-  message User {
-    // TODO: Define it.
-  }
-
-  map<string, PTransform> ptransforms = 1;
-  map<string, User> user = 2;
-}
-
-message ProcessBundleProgressResponse {
-  // (Required)
-  Metrics metrics = 1;
-}
-
-message ProcessBundleSplitRequest {
-  // (Required) A reference to an active process bundle request with the given
-  // instruction id.
-  string instruction_reference = 1;
-
-  // (Required) The fraction of work (when compared to the known amount of work)
-  // the process bundle request should try to split at.
-  double fraction = 2;
-}
-
-// urn:org.apache.beam:restriction:element-count:1.0
-message ElementCountRestriction {
-  // A restriction representing the number of elements that should be processed.
-  // Effectively the range [0, count]
-  int64 count = 1;
-}
-
-// urn:org.apache.beam:restriction:element-count-skip:1.0
-message ElementCountSkipRestriction {
-  // A restriction representing the number of elements that should be skipped.
-  // Effectively the range (count, infinity]
-  int64 count = 1;
-}
-
-// Each primitive transform that is splittable is defined by a restriction
-// it is currently processing. During splitting, that currently active
-// restriction (R_initial) is split into 2 components:
-//   * a restriction (R_done) representing all elements that will be fully
-//     processed
-//   * a restriction (R_todo) representing all elements that will not be fully
-//     processed
-//
-// where:
-//   R_initial = R_done ⋃ R_todo
-message PrimitiveTransformSplit {
-  // (Required) A reference to a primitive transform with the given id that
-  // is part of the active process bundle request with the given instruction
-  // id.
-  string primitive_transform_reference = 1;
-
-  // (Required) A function specification describing the restriction
-  // that has been completed by the primitive transform.
-  //
-  // For example, a remote GRPC source will have a specific urn and data
-  // block containing an ElementCountRestriction.
-  org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
-
-  // (Required) A function specification describing the restriction
-  // representing the remainder of work for the primitive transform.
-  //
-  // FOr example, a remote GRPC source will have a specific urn and data
-  // block contain an ElemntCountSkipRestriction.
-  org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
-}
-
-message ProcessBundleSplitResponse {
-  // (Optional) A set of split responses for a currently active work item.
-  //
-  // If primitive transform B is a descendant of primitive transform A and both
-  // A and B report a split. Then B's restriction is reported as an element
-  // restriction pair and thus the fully reported restriction is:
-  //   R = A_done
-  //     ⋃ (A_boundary ⋂ B_done)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ A_todo
-  // If there is a decendant of B named C, then C would similarly report a
-  // set of element pair restrictions.
-  //
-  // This restriction is processed and completed by the currently active process
-  // bundle request:
-  //   A_done ⋃ (A_boundary ⋂ B_done)
-  // and these restrictions will be processed by future process bundle requests:
-  //   A_boundary ⋂ B_todo (passed to SDF B directly)
-  //   A_todo (passed to SDF A directly)
-
-  // If primitive transform B and C are siblings and descendants of A and A, B,
-  // and C report a split. Then B and C's restrictions are relative to A's.
-  //   R = A_done
-  //     ⋃ (A_boundary ⋂ B_done)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ (A_boundary ⋂ C_todo)
-  //     ⋃ A_todo
-  // If there is no descendant of B or C also reporting a split, than
-  //   B_boundary = ∅ and C_boundary = ∅
-  //
-  // This restriction is processed and completed by the currently active process
-  // bundle request:
-  //   A_done ⋃ (A_boundary ⋂ B_done)
-  //          ⋃ (A_boundary ⋂ C_done)
-  // and these restrictions will be processed by future process bundle requests:
-  //   A_boundary ⋂ B_todo (passed to SDF B directly)
-  //   A_boundary ⋂ C_todo (passed to SDF C directly)
-  //   A_todo (passed to SDF A directly)
-  //
-  // Note that descendants splits should only be reported if it is inexpensive
-  // to compute the boundary restriction intersected with descendants splits.
-  // Also note, that the boundary restriction may represent a set of elements
-  // produced by a parent primitive transform which can not be split at each
-  // element or that there are intermediate unsplittable primitive transforms
-  // between an ancestor splittable function and a descendant splittable
-  // function which may have more than one output per element. Finally note
-  // that the descendant splits should only be reported if the split
-  // information is relatively compact.
-  repeated PrimitiveTransformSplit splits = 1;
-}
-
-/*
- * Data Plane API
- */
-
-// Messages used to represent logical byte streams.
-// Stable
-message Elements {
-  // Represents multiple encoded elements in nested context for a given named
-  // instruction and target.
-  message Data {
-    // (Required) A reference to an active instruction request with the given
-    // instruction id.
-    string instruction_reference = 1;
-
-    // (Required) A definition representing a consumer or producer of this data.
-    // If received by a harness, this represents the consumer within that
-    // harness that should consume these bytes. If sent by a harness, this
-    // represents the producer of these bytes.
-    //
-    // Note that a single element may span multiple Data messages.
-    //
-    // Note that a sending/receiving pair should share the same target
-    // identifier.
-    Target target = 2;
-
-    // (Optional) Represents a part of a logical byte stream. Elements within
-    // the logical byte stream are encoded in the nested context and
-    // concatenated together.
-    //
-    // An empty data block represents the end of stream for the given
-    // instruction and target.
-    bytes data = 3;
-  }
-
-  // (Required) A list containing parts of logical byte streams.
-  repeated Data data = 1;
-}
-
-// Stable
-service BeamFnData {
-  // Used to send data between harnesses.
-  rpc Data(
-    // A stream of data representing input.
-    stream Elements
-  ) returns (
-    // A stream of data representing output.
-    stream Elements
-  ) {}
-}
-
-/*
- * State API
- */
-
-message StateRequest {
-  // (Required) An unique identifier provided by the SDK which represents this
-  // requests execution. The StateResponse corresponding with this request
-  // will have the matching id.
-  string id = 1;
-
-  // (Required) The associated instruction id of the work that is currently
-  // being processed. This allows for the runner to associate any modifications
-  // to state to be committed with the appropriate work execution.
-  string instruction_reference = 2;
-
-  // (Required) The state key this request is for.
-  StateKey state_key = 3;
-
-  // (Required) The action to take on this request.
-  oneof request {
-    // A request to get state.
-    StateGetRequest get = 1000;
-
-    // A request to append to state.
-    StateAppendRequest append = 1001;
-
-    // A request to clear state.
-    StateClearRequest clear = 1002;
-  }
-}
-
-message StateResponse {
-  // (Required) A reference provided by the SDK which represents a requests
-  // execution. The StateResponse must have the matching id when responding
-  // to the SDK.
-  string id = 1;
-
-  // (Optional) If this is specified, then the state request has failed.
-  // A human readable string representing the reason as to why the request
-  // failed.
-  string error = 2;
-
-  // (Optional) If this is specified, then the result of this state request
-  // can be cached using the supplied token.
-  bytes cache_token = 3;
-
-  // A corresponding response matching the request will be populated.
-  oneof response {
-    // A response to getting state.
-    StateGetResponse get = 1000;
-
-    // A response to appending to state.
-    StateAppendResponse append = 1001;
-
-    // A response to clearing state.
-    StateClearResponse clear = 1002;
-  }
-}
-
-service BeamFnState {
-  // Used to get/append/clear state stored by the runner on behalf of the SDK.
-  rpc State(
-    // A stream of state instructions requested of the runner.
-    stream StateRequest
-  ) returns (
-    // A stream of responses to state instructions the runner was asked to be
-    // performed.
-    stream StateResponse
-  ) {}
-}
-
-message StateKey {
-  message Runner {
-    // (Required) Opaque information supplied by the runner. Used to support
-    // remote references.
-    bytes key = 1;
-  }
-
-  message MultimapSideInput {
-    // (Required) The id of the PTransform containing a side input.
-    string ptransform_id = 1;
-    // (Required) The id of the side input.
-    string side_input_id = 2;
-    // (Required) The window (after mapping the currently executing elements
-    // window into the side input windows domain) encoded in a nested context.
-    bytes window = 3;
-    // (Required) The key encoded in a nested context.
-    bytes key = 4;
-  }
-
-  message BagUserState {
-    // (Required) The id of the PTransform containing user state.
-    string ptransform_id = 1;
-    // (Required) The id of the user state.
-    string user_state_id = 2;
-    // (Required) The window encoded in a nested context. 
-    bytes window = 3;
-    // (Required) The key of the currently executing element encoded in a
-    // nested context.
-    bytes key = 4;
-  }
-
-  // (Required) One of the following state keys must be set.
-  oneof type {
-    Runner runner = 1;
-    MultimapSideInput multimap_side_input = 2;
-    BagUserState bag_user_state = 3;
-    // TODO: represent a state key for user map state
-  }
-}
-
-// A request to get state.
-message StateGetRequest {
-  // (Optional) If specified, signals to the runner that the response
-  // should resume from the following continuation token.
-  //
-  // If unspecified, signals to the runner that the response should start
-  // from the beginning of the logical continuable stream.
-  bytes continuation_token = 1;
-}
-
-// A response to get state representing a logical byte stream which can be
-// continued using the state API.
-message StateGetResponse {
-  // (Optional) If specified, represents a token which can be used with the
-  // state API to get the next chunk of this logical byte stream. The end of
-  // the logical byte stream is signalled by this field being unset.
-  bytes continuation_token = 1;
-
-  // Represents a part of a logical byte stream. Elements within
-  // the logical byte stream are encoded in the nested context and
-  // concatenated together.
-  bytes data = 2;
-}
-
-// A request to append state.
-message StateAppendRequest {
-  // Represents a part of a logical byte stream. Elements within
-  // the logical byte stream are encoded in the nested context and
-  // multiple append requests are concatenated together.
-  bytes data = 1;
-}
-
-// A response to append state.
-message StateAppendResponse {
-}
-
-// A request to clear state.
-message StateClearRequest {
-}
-
-// A response to clear state.
-message StateClearResponse {
-}
-
-/*
- * Logging API
- *
- * This is very stable. There can be some changes to how we define a LogEntry,
- * to increase/decrease the severity types, the way we format an exception/stack
- * trace, or the log site.
- */
-
-// A log entry
-message LogEntry {
-  // A list of log entries, enables buffering and batching of multiple
-  // log messages using the logging API.
-  message List {
-    // (Required) One or or more log messages.
-    repeated LogEntry log_entries = 1;
-  }
-
-  // The severity of the event described in a log entry, expressed as one of the
-  // severity levels listed below. For your reference, the levels are
-  // assigned the listed numeric values. The effect of using numeric values
-  // other than those listed is undefined.
-  //
-  // If you are writing log entries, you should map other severity encodings to
-  // one of these standard levels. For example, you might map all of
-  // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
-  //
-  // This list is intentionally not comprehensive; the intent is to provide a
-  // common set of "good enough" severity levels so that logging front ends
-  // can provide filtering and searching across log types. Users of the API are
-  // free not to use all severity levels in their log messages.
-  message Severity {
-    enum Enum {
-      UNSPECIFIED = 0;
-      // Trace level information, also the default log level unless
-      // another severity is specified.
-      TRACE = 1;
-      // Debugging information.
-      DEBUG = 2;
-      // Normal events.
-      INFO = 3;
-      // Normal but significant events, such as start up, shut down, or
-      // configuration.
-      NOTICE = 4;
-      // Warning events might cause problems.
-      WARN = 5;
-      // Error events are likely to cause problems.
-      ERROR = 6;
-      // Critical events cause severe problems or brief outages and may
-      // indicate that a person must take action.
-      CRITICAL = 7;
-    }
-  }
-
-  // (Required) The severity of the log statement.
-  Severity.Enum severity = 1;
-
-  // (Required) The time at which this log statement occurred.
-  google.protobuf.Timestamp timestamp = 2;
-
-  // (Required) A human readable message.
-  string message = 3;
-
-  // (Optional) An optional trace of the functions involved. For example, in
-  // Java this can include multiple causes and multiple suppressed exceptions.
-  string trace = 4;
-
-  // (Optional) A reference to the instruction this log statement is associated
-  // with.
-  string instruction_reference = 5;
-
-  // (Optional) A reference to the primitive transform this log statement is
-  // associated with.
-  string primitive_transform_reference = 6;
-
-  // (Optional) Human-readable name of the function or method being invoked,
-  // with optional context such as the class or package name. The format can
-  // vary by language. For example:
-  //   qual.if.ied.Class.method (Java)
-  //   dir/package.func (Go)
-  //   module.function (Python)
-  //   file.cc:382 (C++)
-  string log_location = 7;
-
-  // (Optional) The name of the thread this log statement is associated with.
-  string thread = 8;
-}
-
-message LogControl {
-}
-
-// Stable
-service BeamFnLogging {
-  // Allows for the SDK to emit log entries which the runner can
-  // associate with the active job.
-  rpc Logging(
-    // A stream of log entries batched into lists emitted by the SDK harness.
-    stream LogEntry.List
-  ) returns (
-    // A stream of log control messages used to configure the SDK.
-    stream LogControl
-  ) {}
-}
-
-/*
- * Environment types
- */
-// A Docker container configuration for launching the SDK harness to execute
-// user specified functions.
-message DockerContainer {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1;
-
-  // (Required) The Docker container URI
-  // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
-  string uri = 2;
-
-  // (Optional) Docker registry specification.
-  // If unspecified, the uri is expected to be able to be fetched without
-  // requiring additional configuration by a runner.
-  string registry_reference = 3;
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
deleted file mode 100644
index b0cd6b4..0000000
--- a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Provision API, for communicating with a runner
- * for job and environment provisioning information over GRPC.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.fn.v1;
-
-option java_package = "org.apache.beam.fn.v1";
-option java_outer_classname = "ProvisionApi";
-
-import "google/protobuf/struct.proto";
-
-// A service to provide runtime provisioning information to the SDK harness
-// worker instances -- such as pipeline options, resource constraints and
-// other job metadata -- needed by an SDK harness instance to initialize.
-service ProvisionService {
-    // Get provision information for the SDK harness worker instance.
-    rpc GetProvisionInfo(GetProvisionInfoRequest) returns (GetProvisionInfoResponse);
-}
-
-// A request to get the provision info of a SDK harness worker instance.
-message GetProvisionInfoRequest { }
-
-// A response containing the provision info of a SDK harness worker instance.
-message GetProvisionInfoResponse {
-    ProvisionInfo info = 1;
-}
-
-// Runtime provisioning information for a SDK harness worker instance,
-// such as pipeline options, resource constraints and other job metadata
-message ProvisionInfo {
-    // (required) The job ID.
-    string job_id = 1;
-    // (required) The job name.
-    string job_name = 2;
-
-    // (required) Pipeline options. For non-template jobs, the options are
-    // identical to what is passed to job submission.
-    google.protobuf.Struct pipeline_options = 3;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
deleted file mode 100644
index f37b2d3..0000000
--- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
+++ /dev/null
@@ -1,195 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This file is broken into multiple sections delimited by ---. Each section specifies a set of
-# reference encodings for a single standardized coder used in a specific context.
-#
-# Each section contains up to 3 properties:
-#
-#   coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
-#   nested: a boolean meaning whether the coder was used in the nested context. Missing means to
-#           test both contexts, a shorthand for when the coder is invariant across context.
-#   examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
-#             The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
-#             one of a few standard JSON types such as numbers, strings, dicts that map naturally
-#             to the type encoded by the coder.
-#
-# These choices were made to strike a balance between portability, ease of use, and simple
-# legibility of this file itself.
-#
-# It is expected that future work will move the `coder` field into a format that it would be
-# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
-#
-# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
-
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: false
-examples:
-  "abc": abc
-  "ab\0c": "ab\0c"
-
----
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: true
-examples:
-  "\u0003abc": abc
-  "\u0004ab\0c": "ab\0c"
-  "\u00c8\u0001       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|":
-              "       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|"
-
----
-
-coder:
-  urn: "urn:beam:coders:varint:0.1"
-examples:
-  "\0": 0
-  "\u0001": 1
-  "\u000A": 10
-  "\u00c8\u0001": 200
-  "\u00e8\u0007": 1000
-  "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\u0003abc\0": {key: abc, value: 0}
-  "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: false
-examples:
-  "\u0003abcdef": {key: abc, value: def}
-  "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: true
-examples:
-  "\u0003abc\u0003def": {key: abc, value: def}
-  "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:interval_window:0.1"
-examples:
-  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
-  "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
-  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
-  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\0\0\0\u0001\0": [0]
-  "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-examples:
-  "\0\0\0\u0001\u0003abc": ["abc"]
-  "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-  # This is for iterables of unknown length, where the encoding is not
-  # deterministic.
-  non_deterministic: True
-examples:
-  "\u00ff\u00ff\u00ff\u00ff\u0000": []
-  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
-  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\0\0\0\u0001": [""]
-
----
-
-coder:
-  urn: "urn:beam:coders:global_window:0.1"
-examples:
-  "": ""
-
----
-
-# All windowed values consist of pane infos that represent NO_FIRING until full support is added
-# in the Python SDK (BEAM-1522).
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
-    value: 2,
-    timestamp: 1454293425000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: ["global"]
-  }
-
----
-
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:interval_window:0.1"}]
-examples:
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
-    value: 4,
-    timestamp: -400000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 280000}]
-  }
-
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
-    value: 2,
-    timestamp: -100,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
-  }

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index d950759..d4ecad0 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -33,6 +33,5 @@
   <name>Apache Beam :: SDKs :: Common</name>
 
   <modules>
-    <module>fn-api</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 36271eb..6c74cd9 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -302,7 +302,7 @@
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
+      <artifactId>beam-model-fn-execution</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 7f9156c..de24f7f 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -51,10 +51,10 @@
                     <include>com.google.guava:guava</include>
                     <!-- java harness dependencies that are not staged -->
                     <include>org.apache.beam:beam-model-pipeline</include>
+                    <include>org.apache.beam:beam-model-fn-execution</include>
                     <include>org.apache.beam:beam-runners-core-construction-java</include>
                     <include>org.apache.beam:beam-runners-core-java</include>
                     <include>org.apache.beam:beam-runners-google-cloud-dataflow-java</include>
-                    <include>org.apache.beam:beam-sdks-common-fn-api</include>
                     <include>io.netty:netty-transport-native-epoll</include>
                   </includes>
                 </artifactSet>
@@ -131,6 +131,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
@@ -164,11 +169,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>