You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/11 19:25:59 UTC
[03/12] beam git commit: Move sdks/common/fn-api protos to
model/fn-execution
Move sdks/common/fn-api protos to model/fn-execution
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e233af9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e233af9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e233af9a
Branch: refs/heads/master
Commit: e233af9a2ad8c45153315d1daffbc5191eb176a8
Parents: 2f3af31
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 5 14:15:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 11 09:53:37 2017 -0700
----------------------------------------------------------------------
model/fn-execution/pom.xml | 32 +
.../src/main/proto/beam_fn_api.proto | 726 +++++++++++++++++++
.../src/main/proto/beam_provision_api.proto | 60 ++
.../org/apache/beam/fn/v1/standard_coders.yaml | 195 +++++
pom.xml | 13 -
runners/apex/pom.xml | 2 +-
runners/core-java/pom.xml | 2 +-
runners/direct-java/pom.xml | 2 +-
runners/flink/pom.xml | 2 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
runners/spark/pom.xml | 2 +-
sdks/common/fn-api/pom.xml | 114 ---
.../fn-api/src/main/proto/beam_fn_api.proto | 726 -------------------
.../src/main/proto/beam_provision_api.proto | 60 --
.../org/apache/beam/fn/v1/standard_coders.yaml | 195 -----
sdks/common/pom.xml | 1 -
sdks/java/core/pom.xml | 2 +-
sdks/java/harness/pom.xml | 12 +-
18 files changed, 1026 insertions(+), 1122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/model/fn-execution/pom.xml b/model/fn-execution/pom.xml
index 807feb6..b5b5fdf 100644
--- a/model/fn-execution/pom.xml
+++ b/model/fn-execution/pom.xml
@@ -79,4 +79,36 @@
</plugin>
</plugins>
</build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-model-pipeline</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
new file mode 100644
index 0000000..5a01077
--- /dev/null
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Fn API and boostrapping.
+ *
+ * TODO: Usage of plural names in lists looks awkward in Java
+ * e.g. getOutputsMap, addCodersBuilder
+ *
+ * TODO: gRPC / proto field names conflict with generated code
+ * e.g. "class" in java, "output" in python
+ */
+
+syntax = "proto3";
+
+/* TODO: Consider consolidating common components in another package
+ * and lanaguage namespaces for re-use with Runner Api.
+ */
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "BeamFnApi";
+
+import "beam_runner_api.proto";
+import "endpoints.proto";
+import "google/protobuf/timestamp.proto";
+
+/*
+ * Constructs that define the pipeline shape.
+ *
+ * These are mostly unstable due to the missing pieces to be shared with
+ * the Runner Api like windowing strategy, display data, .... There are still
+ * some modelling questions related to whether a side input is modelled
+ * as another field on a PrimitiveTransform or as part of inputs and we
+ * still are missing things like the CompositeTransform.
+ */
+
+// A representation of an input or output definition on a primitive transform.
+// Stable
+message Target {
+ // A repeated list of target definitions.
+ message List {
+ repeated Target target = 1;
+ }
+
+ // (Required) The id of the PrimitiveTransform which is the target.
+ string primitive_transform_reference = 1;
+
+ // (Required) The local name of an input or output defined on the primitive
+ // transform.
+ string name = 2;
+}
+
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+message RemoteGrpcPort {
+ // (Required) An API descriptor which describes where to
+ // connect to including any authentication that is required.
+ org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 1;
+}
+
+/*
+ * Control Plane API
+ *
+ * Progress reporting and splitting still need further vetting. Also, this may change
+ * with the addition of new types of instructions/responses related to metrics.
+ */
+
+// An API that describes the work that a SDK harness is meant to do.
+// Stable
+service BeamFnControl {
+ // Instructions sent by the runner to the SDK requesting different types
+ // of work.
+ rpc Control(
+ // A stream of responses to instructions the SDK was asked to be performed.
+ stream InstructionResponse
+ ) returns (
+ // A stream of instructions requested of the SDK to be performed.
+ stream InstructionRequest
+ ) {}
+}
+
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
+// Stable
+message InstructionRequest {
+ // (Required) An unique identifier provided by the runner which represents
+ // this requests execution. The InstructionResponse MUST have the matching id.
+ string instruction_id = 1;
+
+ // (Required) A request that the SDK Harness needs to interpret.
+ oneof request {
+ RegisterRequest register = 1000;
+ ProcessBundleRequest process_bundle = 1001;
+ ProcessBundleProgressRequest process_bundle_progress = 1002;
+ ProcessBundleSplitRequest process_bundle_split = 1003;
+ }
+}
+
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+message InstructionResponse {
+ // (Required) A reference provided by the runner which represents a requests
+ // execution. The InstructionResponse MUST have the matching id when
+ // responding to the runner.
+ string instruction_id = 1;
+
+ // If this is specified, then this instruction has failed.
+ // A human readable string representing the reason as to why processing has
+ // failed.
+ string error = 2;
+
+ // If the instruction did not fail, it is required to return an equivalent
+ // response type depending on the request this matches.
+ oneof response {
+ RegisterResponse register = 1000;
+ ProcessBundleResponse process_bundle = 1001;
+ ProcessBundleProgressResponse process_bundle_progress = 1002;
+ ProcessBundleSplitResponse process_bundle_split = 1003;
+ }
+}
+
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+message RegisterRequest {
+ // (Optional) The set of descriptors used to process bundles.
+ repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
+}
+
+// Stable
+message RegisterResponse {
+}
+
+// Definitions that should be used to construct the bundle processing graph.
+message ProcessBundleDescriptor {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ string id = 1;
+
+ // (Required) A map from pipeline-scoped id to PTransform.
+ map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
+
+ // (Required) A map from pipeline-scoped id to PCollection.
+ map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
+
+ // (Required) A map from pipeline-scoped id to WindowingStrategy.
+ map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4;
+
+ // (Required) A map from pipeline-scoped id to Coder.
+ map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
+
+ // (Required) A map from pipeline-scoped id to Environment.
+ map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
+
+ // A descriptor describing the end point to use for State API
+ // calls. Required if the Runner intends to send remote references over the
+ // data plane or if any of the transforms rely on user state or side inputs.
+ org.apache.beam.portability.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
+}
+
+// A request to process a given bundle.
+// Stable
+message ProcessBundleRequest {
+ // (Required) A reference to the process bundle descriptor that must be
+ // instantiated and executed by the SDK harness.
+ string process_bundle_descriptor_reference = 1;
+
+ // (Optional) A list of cache tokens that can be used by an SDK to reuse
+ // cached data returned by the State API across multiple bundles.
+ repeated bytes cache_tokens = 2;
+}
+
+// Stable
+message ProcessBundleResponse {
+ // (Optional) If metrics reporting is supported by the SDK, this represents
+ // the final metrics to record for this bundle.
+ Metrics metrics = 1;
+}
+
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
+message ProcessBundleProgressRequest {
+ // (Required) A reference to an active process bundle request with the given
+ // instruction id.
+ string instruction_reference = 1;
+}
+
+message Metrics {
+ // PTransform level metrics.
+ // These metrics are split into processed and active element groups for
+ // progress reporting purposes. This allows a Runner to see what is measured,
+ // what is estimated and what can be extrapolated to be able to accurately
+ // estimate the backlog of remaining work.
+ message PTransform {
+ // Metrics that are measured for processed and active element groups.
+ message Measured {
+ // (Required) Map from local input name to number of elements processed
+ // from this input.
+ map<string, int64> input_element_counts = 1;
+
+ // (Required) Map from local output name to number of elements produced
+ // for this output.
+ map<string, int64> output_element_counts = 2;
+
+ // (Optional) The total time spent so far in processing the elements in
+ // this group.
+ int64 total_time_spent = 3;
+
+ // TODO: Add other element group level metrics.
+ }
+
+ // Metrics for fully processed elements.
+ message ProcessedElements {
+ // (Required)
+ Measured measured = 1;
+ }
+
+ // Metrics for active elements.
+ // An element is considered active if the SDK has started but not finished
+ // processing it yet.
+ message ActiveElements {
+ // (Required)
+ Measured measured = 1;
+
+ // Estimated metrics.
+
+ // (Optional) Sum of estimated fraction of known work remaining for all
+ // active elements, as reported by this transform.
+ // If not reported, a Runner could extrapolate this from the processed
+ // elements.
+ // TODO: Handle the case when known work is infinite.
+ double fraction_remaining = 2;
+
+ // (Optional) Map from local output name to sum of estimated number
+ // of elements remaining for this output from all active elements,
+ // as reported by this transform.
+ // If not reported, a Runner could extrapolate this from the processed
+ // elements.
+ map<string, int64> output_elements_remaining = 3;
+ }
+
+ // (Required): Metrics for processed elements.
+ ProcessedElements processed_elements = 1;
+ // (Required): Metrics for active elements.
+ ActiveElements active_elements = 2;
+
+ // (Optional): Map from local output name to its watermark.
+ // The watermarks reported are tentative, to get a better sense of progress
+ // while processing a bundle but before it is committed. At bundle commit
+ // time, a Runner needs to also take into account the timers set to compute
+ // the actual watermarks.
+ map<string, int64> watermarks = 3;
+
+ // TODO: Define other transform level system metrics.
+ }
+
+ // User defined metrics
+ message User {
+ // TODO: Define it.
+ }
+
+ map<string, PTransform> ptransforms = 1;
+ map<string, User> user = 2;
+}
+
+message ProcessBundleProgressResponse {
+ // (Required)
+ Metrics metrics = 1;
+}
+
+message ProcessBundleSplitRequest {
+ // (Required) A reference to an active process bundle request with the given
+ // instruction id.
+ string instruction_reference = 1;
+
+ // (Required) The fraction of work (when compared to the known amount of work)
+ // the process bundle request should try to split at.
+ double fraction = 2;
+}
+
+// urn:org.apache.beam:restriction:element-count:1.0
+message ElementCountRestriction {
+ // A restriction representing the number of elements that should be processed.
+ // Effectively the range [0, count]
+ int64 count = 1;
+}
+
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+message ElementCountSkipRestriction {
+ // A restriction representing the number of elements that should be skipped.
+ // Effectively the range (count, infinity]
+ int64 count = 1;
+}
+
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+// * a restriction (R_done) representing all elements that will be fully
+// processed
+// * a restriction (R_todo) representing all elements that will not be fully
+// processed
+//
+// where:
+// R_initial = R_done ⋃ R_todo
+message PrimitiveTransformSplit {
+ // (Required) A reference to a primitive transform with the given id that
+ // is part of the active process bundle request with the given instruction
+ // id.
+ string primitive_transform_reference = 1;
+
+ // (Required) A function specification describing the restriction
+ // that has been completed by the primitive transform.
+ //
+ // For example, a remote GRPC source will have a specific urn and data
+ // block containing an ElementCountRestriction.
+ org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
+
+ // (Required) A function specification describing the restriction
+ // representing the remainder of work for the primitive transform.
+ //
+ // FOr example, a remote GRPC source will have a specific urn and data
+ // block contain an ElemntCountSkipRestriction.
+ org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
+}
+
+message ProcessBundleSplitResponse {
+ // (Optional) A set of split responses for a currently active work item.
+ //
+ // If primitive transform B is a descendant of primitive transform A and both
+ // A and B report a split. Then B's restriction is reported as an element
+ // restriction pair and thus the fully reported restriction is:
+ // R = A_done
+ // ⋃ (A_boundary ⋂ B_done)
+ // ⋃ (A_boundary ⋂ B_todo)
+ // ⋃ A_todo
+ // If there is a decendant of B named C, then C would similarly report a
+ // set of element pair restrictions.
+ //
+ // This restriction is processed and completed by the currently active process
+ // bundle request:
+ // A_done ⋃ (A_boundary ⋂ B_done)
+ // and these restrictions will be processed by future process bundle requests:
+ // A_boundary ⋂ B_todo (passed to SDF B directly)
+ // A_todo (passed to SDF A directly)
+
+ // If primitive transform B and C are siblings and descendants of A and A, B,
+ // and C report a split. Then B and C's restrictions are relative to A's.
+ // R = A_done
+ // ⋃ (A_boundary ⋂ B_done)
+ // ⋃ (A_boundary ⋂ B_todo)
+ // ⋃ (A_boundary ⋂ B_todo)
+ // ⋃ (A_boundary ⋂ C_todo)
+ // ⋃ A_todo
+ // If there is no descendant of B or C also reporting a split, than
+ // B_boundary = ∅ and C_boundary = ∅
+ //
+ // This restriction is processed and completed by the currently active process
+ // bundle request:
+ // A_done ⋃ (A_boundary ⋂ B_done)
+ // ⋃ (A_boundary ⋂ C_done)
+ // and these restrictions will be processed by future process bundle requests:
+ // A_boundary ⋂ B_todo (passed to SDF B directly)
+ // A_boundary ⋂ C_todo (passed to SDF C directly)
+ // A_todo (passed to SDF A directly)
+ //
+ // Note that descendants splits should only be reported if it is inexpensive
+ // to compute the boundary restriction intersected with descendants splits.
+ // Also note, that the boundary restriction may represent a set of elements
+ // produced by a parent primitive transform which can not be split at each
+ // element or that there are intermediate unsplittable primitive transforms
+ // between an ancestor splittable function and a descendant splittable
+ // function which may have more than one output per element. Finally note
+ // that the descendant splits should only be reported if the split
+ // information is relatively compact.
+ repeated PrimitiveTransformSplit splits = 1;
+}
+
+/*
+ * Data Plane API
+ */
+
+// Messages used to represent logical byte streams.
+// Stable
+message Elements {
+ // Represents multiple encoded elements in nested context for a given named
+ // instruction and target.
+ message Data {
+ // (Required) A reference to an active instruction request with the given
+ // instruction id.
+ string instruction_reference = 1;
+
+ // (Required) A definition representing a consumer or producer of this data.
+ // If received by a harness, this represents the consumer within that
+ // harness that should consume these bytes. If sent by a harness, this
+ // represents the producer of these bytes.
+ //
+ // Note that a single element may span multiple Data messages.
+ //
+ // Note that a sending/receiving pair should share the same target
+ // identifier.
+ Target target = 2;
+
+ // (Optional) Represents a part of a logical byte stream. Elements within
+ // the logical byte stream are encoded in the nested context and
+ // concatenated together.
+ //
+ // An empty data block represents the end of stream for the given
+ // instruction and target.
+ bytes data = 3;
+ }
+
+ // (Required) A list containing parts of logical byte streams.
+ repeated Data data = 1;
+}
+
+// Stable
+service BeamFnData {
+ // Used to send data between harnesses.
+ rpc Data(
+ // A stream of data representing input.
+ stream Elements
+ ) returns (
+ // A stream of data representing output.
+ stream Elements
+ ) {}
+}
+
+/*
+ * State API
+ */
+
+message StateRequest {
+ // (Required) An unique identifier provided by the SDK which represents this
+ // requests execution. The StateResponse corresponding with this request
+ // will have the matching id.
+ string id = 1;
+
+ // (Required) The associated instruction id of the work that is currently
+ // being processed. This allows for the runner to associate any modifications
+ // to state to be committed with the appropriate work execution.
+ string instruction_reference = 2;
+
+ // (Required) The state key this request is for.
+ StateKey state_key = 3;
+
+ // (Required) The action to take on this request.
+ oneof request {
+ // A request to get state.
+ StateGetRequest get = 1000;
+
+ // A request to append to state.
+ StateAppendRequest append = 1001;
+
+ // A request to clear state.
+ StateClearRequest clear = 1002;
+ }
+}
+
+message StateResponse {
+ // (Required) A reference provided by the SDK which represents a requests
+ // execution. The StateResponse must have the matching id when responding
+ // to the SDK.
+ string id = 1;
+
+ // (Optional) If this is specified, then the state request has failed.
+ // A human readable string representing the reason as to why the request
+ // failed.
+ string error = 2;
+
+ // (Optional) If this is specified, then the result of this state request
+ // can be cached using the supplied token.
+ bytes cache_token = 3;
+
+ // A corresponding response matching the request will be populated.
+ oneof response {
+ // A response to getting state.
+ StateGetResponse get = 1000;
+
+ // A response to appending to state.
+ StateAppendResponse append = 1001;
+
+ // A response to clearing state.
+ StateClearResponse clear = 1002;
+ }
+}
+
+service BeamFnState {
+ // Used to get/append/clear state stored by the runner on behalf of the SDK.
+ rpc State(
+ // A stream of state instructions requested of the runner.
+ stream StateRequest
+ ) returns (
+ // A stream of responses to state instructions the runner was asked to be
+ // performed.
+ stream StateResponse
+ ) {}
+}
+
+message StateKey {
+ message Runner {
+ // (Required) Opaque information supplied by the runner. Used to support
+ // remote references.
+ bytes key = 1;
+ }
+
+ message MultimapSideInput {
+ // (Required) The id of the PTransform containing a side input.
+ string ptransform_id = 1;
+ // (Required) The id of the side input.
+ string side_input_id = 2;
+ // (Required) The window (after mapping the currently executing elements
+ // window into the side input windows domain) encoded in a nested context.
+ bytes window = 3;
+ // (Required) The key encoded in a nested context.
+ bytes key = 4;
+ }
+
+ message BagUserState {
+ // (Required) The id of the PTransform containing user state.
+ string ptransform_id = 1;
+ // (Required) The id of the user state.
+ string user_state_id = 2;
+ // (Required) The window encoded in a nested context.
+ bytes window = 3;
+ // (Required) The key of the currently executing element encoded in a
+ // nested context.
+ bytes key = 4;
+ }
+
+ // (Required) One of the following state keys must be set.
+ oneof type {
+ Runner runner = 1;
+ MultimapSideInput multimap_side_input = 2;
+ BagUserState bag_user_state = 3;
+ // TODO: represent a state key for user map state
+ }
+}
+
+// A request to get state.
+message StateGetRequest {
+ // (Optional) If specified, signals to the runner that the response
+ // should resume from the following continuation token.
+ //
+ // If unspecified, signals to the runner that the response should start
+ // from the beginning of the logical continuable stream.
+ bytes continuation_token = 1;
+}
+
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
+message StateGetResponse {
+ // (Optional) If specified, represents a token which can be used with the
+ // state API to get the next chunk of this logical byte stream. The end of
+ // the logical byte stream is signalled by this field being unset.
+ bytes continuation_token = 1;
+
+ // Represents a part of a logical byte stream. Elements within
+ // the logical byte stream are encoded in the nested context and
+ // concatenated together.
+ bytes data = 2;
+}
+
+// A request to append state.
+message StateAppendRequest {
+ // Represents a part of a logical byte stream. Elements within
+ // the logical byte stream are encoded in the nested context and
+ // multiple append requests are concatenated together.
+ bytes data = 1;
+}
+
+// A response to append state.
+message StateAppendResponse {
+}
+
+// A request to clear state.
+message StateClearRequest {
+}
+
+// A response to clear state.
+message StateClearResponse {
+}
+
+/*
+ * Logging API
+ *
+ * This is very stable. There can be some changes to how we define a LogEntry,
+ * to increase/decrease the severity types, the way we format an exception/stack
+ * trace, or the log site.
+ */
+
+// A log entry
+message LogEntry {
+ // A list of log entries, enables buffering and batching of multiple
+ // log messages using the logging API.
+ message List {
+ // (Required) One or or more log messages.
+ repeated LogEntry log_entries = 1;
+ }
+
+ // The severity of the event described in a log entry, expressed as one of the
+ // severity levels listed below. For your reference, the levels are
+ // assigned the listed numeric values. The effect of using numeric values
+ // other than those listed is undefined.
+ //
+ // If you are writing log entries, you should map other severity encodings to
+ // one of these standard levels. For example, you might map all of
+ // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+ //
+ // This list is intentionally not comprehensive; the intent is to provide a
+ // common set of "good enough" severity levels so that logging front ends
+ // can provide filtering and searching across log types. Users of the API are
+ // free not to use all severity levels in their log messages.
+ message Severity {
+ enum Enum {
+ UNSPECIFIED = 0;
+ // Trace level information, also the default log level unless
+ // another severity is specified.
+ TRACE = 1;
+ // Debugging information.
+ DEBUG = 2;
+ // Normal events.
+ INFO = 3;
+ // Normal but significant events, such as start up, shut down, or
+ // configuration.
+ NOTICE = 4;
+ // Warning events might cause problems.
+ WARN = 5;
+ // Error events are likely to cause problems.
+ ERROR = 6;
+ // Critical events cause severe problems or brief outages and may
+ // indicate that a person must take action.
+ CRITICAL = 7;
+ }
+ }
+
+ // (Required) The severity of the log statement.
+ Severity.Enum severity = 1;
+
+ // (Required) The time at which this log statement occurred.
+ google.protobuf.Timestamp timestamp = 2;
+
+ // (Required) A human readable message.
+ string message = 3;
+
+ // (Optional) An optional trace of the functions involved. For example, in
+ // Java this can include multiple causes and multiple suppressed exceptions.
+ string trace = 4;
+
+ // (Optional) A reference to the instruction this log statement is associated
+ // with.
+ string instruction_reference = 5;
+
+ // (Optional) A reference to the primitive transform this log statement is
+ // associated with.
+ string primitive_transform_reference = 6;
+
+ // (Optional) Human-readable name of the function or method being invoked,
+ // with optional context such as the class or package name. The format can
+ // vary by language. For example:
+ // qual.if.ied.Class.method (Java)
+ // dir/package.func (Go)
+ // module.function (Python)
+ // file.cc:382 (C++)
+ string log_location = 7;
+
+ // (Optional) The name of the thread this log statement is associated with.
+ string thread = 8;
+}
+
+message LogControl {
+}
+
+// Stable
+service BeamFnLogging {
+ // Allows for the SDK to emit log entries which the runner can
+ // associate with the active job.
+ rpc Logging(
+ // A stream of log entries batched into lists emitted by the SDK harness.
+ stream LogEntry.List
+ ) returns (
+ // A stream of log control messages used to configure the SDK.
+ stream LogControl
+ ) {}
+}
+
+/*
+ * Environment types
+ */
+// A Docker container configuration for launching the SDK harness to execute
+// user specified functions.
+message DockerContainer {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ string id = 1;
+
+ // (Required) The Docker container URI
+ // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
+ string uri = 2;
+
+ // (Optional) Docker registry specification.
+ // If unspecified, the uri is expected to be able to be fetched without
+ // requiring additional configuration by a runner.
+ string registry_reference = 3;
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_provision_api.proto b/model/fn-execution/src/main/proto/beam_provision_api.proto
new file mode 100644
index 0000000..b0cd6b4
--- /dev/null
+++ b/model/fn-execution/src/main/proto/beam_provision_api.proto
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Provision API, for communicating with a runner
+ * for job and environment provisioning information over GRPC.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "ProvisionApi";
+
+import "google/protobuf/struct.proto";
+
+// A service to provide runtime provisioning information to the SDK harness
+// worker instances -- such as pipeline options, resource constraints and
+// other job metadata -- needed by an SDK harness instance to initialize.
+service ProvisionService {
+ // Get provision information for the SDK harness worker instance.
+ rpc GetProvisionInfo(GetProvisionInfoRequest) returns (GetProvisionInfoResponse);
+}
+
+// A request to get the provision info of a SDK harness worker instance.
+message GetProvisionInfoRequest { }
+
+// A response containing the provision info of a SDK harness worker instance.
+message GetProvisionInfoResponse {
+ ProvisionInfo info = 1;
+}
+
+// Runtime provisioning information for a SDK harness worker instance,
+// such as pipeline options, resource constraints and other job metadata
+message ProvisionInfo {
+ // (required) The job ID.
+ string job_id = 1;
+ // (required) The job name.
+ string job_name = 2;
+
+ // (required) Pipeline options. For non-template jobs, the options are
+ // identical to what is passed to job submission.
+ google.protobuf.Struct pipeline_options = 3;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
new file mode 100644
index 0000000..f37b2d3
--- /dev/null
+++ b/model/fn-execution/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is broken into multiple sections delimited by ---. Each section specifies a set of
+# reference encodings for a single standardized coder used in a specific context.
+#
+# Each section contains up to 3 properties:
+#
+# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
+# nested: a boolean meaning whether the coder was used in the nested context. Missing means to
+# test both contexts, a shorthand for when the coder is invariant across context.
+# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
+# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
+# one of a few standard JSON types such as numbers, strings, dicts that map naturally
+# to the type encoded by the coder.
+#
+# These choices were made to strike a balance between portability, ease of use, and simple
+# legibility of this file itself.
+#
+# It is expected that future work will move the `coder` field into a format that it would be
+# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
+
+
+coder:
+ urn: "urn:beam:coders:bytes:0.1"
+nested: false
+examples:
+ "abc": abc
+ "ab\0c": "ab\0c"
+
+---
+
+coder:
+ urn: "urn:beam:coders:bytes:0.1"
+nested: true
+examples:
+ "\u0003abc": abc
+ "\u0004ab\0c": "ab\0c"
+ "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|":
+ " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|"
+
+---
+
+coder:
+ urn: "urn:beam:coders:varint:0.1"
+examples:
+ "\0": 0
+ "\u0001": 1
+ "\u000A": 10
+ "\u00c8\u0001": 200
+ "\u00e8\u0007": 1000
+ "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:varint:0.1"}]
+examples:
+ "\u0003abc\0": {key: abc, value: 0}
+ "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:bytes:0.1"}]
+nested: false
+examples:
+ "\u0003abcdef": {key: abc, value: def}
+ "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:bytes:0.1"}]
+nested: true
+examples:
+ "\u0003abc\u0003def": {key: abc, value: def}
+ "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+ urn: "urn:beam:coders:interval_window:0.1"
+examples:
+ "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
+ "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
+ "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
+ "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"}]
+examples:
+ "\0\0\0\u0001\0": [0]
+ "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
+ "\0\0\0\0": []
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"}]
+examples:
+ "\0\0\0\u0001\u0003abc": ["abc"]
+ "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
+ "\0\0\0\0": []
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"}]
+ # This is for iterables of unknown length, where the encoding is not
+ # deterministic.
+ non_deterministic: True
+examples:
+ "\u00ff\u00ff\u00ff\u00ff\u0000": []
+ "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+ "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+ "\0\0\0\u0001": [""]
+
+---
+
+coder:
+ urn: "urn:beam:coders:global_window:0.1"
+examples:
+ "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is added
+# in the Python SDK (BEAM-1522).
+coder:
+ urn: "urn:beam:coders:windowed_value:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"},
+ {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+ "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+ value: 2,
+ timestamp: 1454293425000,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: ["global"]
+ }
+
+---
+
+coder:
+ urn: "urn:beam:coders:windowed_value:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"},
+ {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+ "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
+ value: 4,
+ timestamp: -400000,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: [{end: 1454293425000, span: 280000}]
+ }
+
+ "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
+ value: 2,
+ timestamp: -100,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+ }
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 212d703..efedb1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,19 +428,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8ade583..f70e67e 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -179,7 +179,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 668c08c..087e24d 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -69,7 +69,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 4e28438..6e356fc 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -327,7 +327,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index effbdb4..e77dbc8 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -371,7 +371,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 873bb2e..da0d237 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -466,7 +466,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 3e9095a..0ba6125 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,7 +333,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
deleted file mode 100644
index 498f83a..0000000
--- a/sdks/common/fn-api/pom.xml
+++ /dev/null
@@ -1,114 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <packaging>jar</packaging>
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-parent</artifactId>
- <version>2.3.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-sdks-common-fn-api</artifactId>
- <name>Apache Beam :: SDKs :: Common :: Fn API</name>
- <description>This artifact generates the stub bindings.</description>
-
- <build>
- <resources>
- <resource>
- <directory>src/test/resources</directory>
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>${project.build.directory}/original_sources_to_package</directory>
- </resource>
- </resources>
-
- <plugins>
- <!-- Skip the checkstyle plugin on generated code -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
-
- <!-- Skip the findbugs plugin on generated code -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
- <pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>compile-custom</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-model-pipeline</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-stub</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
deleted file mode 100644
index 5a01077..0000000
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ /dev/null
@@ -1,726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Fn API and boostrapping.
- *
- * TODO: Usage of plural names in lists looks awkward in Java
- * e.g. getOutputsMap, addCodersBuilder
- *
- * TODO: gRPC / proto field names conflict with generated code
- * e.g. "class" in java, "output" in python
- */
-
-syntax = "proto3";
-
-/* TODO: Consider consolidating common components in another package
- * and lanaguage namespaces for re-use with Runner Api.
- */
-
-package org.apache.beam.fn.v1;
-
-option java_package = "org.apache.beam.fn.v1";
-option java_outer_classname = "BeamFnApi";
-
-import "beam_runner_api.proto";
-import "endpoints.proto";
-import "google/protobuf/timestamp.proto";
-
-/*
- * Constructs that define the pipeline shape.
- *
- * These are mostly unstable due to the missing pieces to be shared with
- * the Runner Api like windowing strategy, display data, .... There are still
- * some modelling questions related to whether a side input is modelled
- * as another field on a PrimitiveTransform or as part of inputs and we
- * still are missing things like the CompositeTransform.
- */
-
-// A representation of an input or output definition on a primitive transform.
-// Stable
-message Target {
- // A repeated list of target definitions.
- message List {
- repeated Target target = 1;
- }
-
- // (Required) The id of the PrimitiveTransform which is the target.
- string primitive_transform_reference = 1;
-
- // (Required) The local name of an input or output defined on the primitive
- // transform.
- string name = 2;
-}
-
-// A descriptor for connecting to a remote port using the Beam Fn Data API.
-// Allows for communication between two environments (for example between the
-// runner and the SDK).
-// Stable
-message RemoteGrpcPort {
- // (Required) An API descriptor which describes where to
- // connect to including any authentication that is required.
- org.apache.beam.portability.v1.ApiServiceDescriptor api_service_descriptor = 1;
-}
-
-/*
- * Control Plane API
- *
- * Progress reporting and splitting still need further vetting. Also, this may change
- * with the addition of new types of instructions/responses related to metrics.
- */
-
-// An API that describes the work that a SDK harness is meant to do.
-// Stable
-service BeamFnControl {
- // Instructions sent by the runner to the SDK requesting different types
- // of work.
- rpc Control(
- // A stream of responses to instructions the SDK was asked to be performed.
- stream InstructionResponse
- ) returns (
- // A stream of instructions requested of the SDK to be performed.
- stream InstructionRequest
- ) {}
-}
-
-// A request sent by a runner which the SDK is asked to fulfill.
-// For any unsupported request type, an error should be returned with a
-// matching instruction id.
-// Stable
-message InstructionRequest {
- // (Required) An unique identifier provided by the runner which represents
- // this requests execution. The InstructionResponse MUST have the matching id.
- string instruction_id = 1;
-
- // (Required) A request that the SDK Harness needs to interpret.
- oneof request {
- RegisterRequest register = 1000;
- ProcessBundleRequest process_bundle = 1001;
- ProcessBundleProgressRequest process_bundle_progress = 1002;
- ProcessBundleSplitRequest process_bundle_split = 1003;
- }
-}
-
-// The response for an associated request the SDK had been asked to fulfill.
-// Stable
-message InstructionResponse {
- // (Required) A reference provided by the runner which represents a requests
- // execution. The InstructionResponse MUST have the matching id when
- // responding to the runner.
- string instruction_id = 1;
-
- // If this is specified, then this instruction has failed.
- // A human readable string representing the reason as to why processing has
- // failed.
- string error = 2;
-
- // If the instruction did not fail, it is required to return an equivalent
- // response type depending on the request this matches.
- oneof response {
- RegisterResponse register = 1000;
- ProcessBundleResponse process_bundle = 1001;
- ProcessBundleProgressResponse process_bundle_progress = 1002;
- ProcessBundleSplitResponse process_bundle_split = 1003;
- }
-}
-
-// A list of objects which can be referred to by the runner in
-// future requests.
-// Stable
-message RegisterRequest {
- // (Optional) The set of descriptors used to process bundles.
- repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
-}
-
-// Stable
-message RegisterResponse {
-}
-
-// Definitions that should be used to construct the bundle processing graph.
-message ProcessBundleDescriptor {
- // (Required) A pipeline level unique id which can be used as a reference to
- // refer to this.
- string id = 1;
-
- // (Required) A map from pipeline-scoped id to PTransform.
- map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
-
- // (Required) A map from pipeline-scoped id to PCollection.
- map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
-
- // (Required) A map from pipeline-scoped id to WindowingStrategy.
- map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4;
-
- // (Required) A map from pipeline-scoped id to Coder.
- map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
-
- // (Required) A map from pipeline-scoped id to Environment.
- map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
-
- // A descriptor describing the end point to use for State API
- // calls. Required if the Runner intends to send remote references over the
- // data plane or if any of the transforms rely on user state or side inputs.
- org.apache.beam.portability.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
-}
-
-// A request to process a given bundle.
-// Stable
-message ProcessBundleRequest {
- // (Required) A reference to the process bundle descriptor that must be
- // instantiated and executed by the SDK harness.
- string process_bundle_descriptor_reference = 1;
-
- // (Optional) A list of cache tokens that can be used by an SDK to reuse
- // cached data returned by the State API across multiple bundles.
- repeated bytes cache_tokens = 2;
-}
-
-// Stable
-message ProcessBundleResponse {
- // (Optional) If metrics reporting is supported by the SDK, this represents
- // the final metrics to record for this bundle.
- Metrics metrics = 1;
-}
-
-// A request to report progress information for a given bundle.
-// This is an optional request to be handled and is used to support advanced
-// SDK features such as SplittableDoFn, user level metrics etc.
-message ProcessBundleProgressRequest {
- // (Required) A reference to an active process bundle request with the given
- // instruction id.
- string instruction_reference = 1;
-}
-
-message Metrics {
- // PTransform level metrics.
- // These metrics are split into processed and active element groups for
- // progress reporting purposes. This allows a Runner to see what is measured,
- // what is estimated and what can be extrapolated to be able to accurately
- // estimate the backlog of remaining work.
- message PTransform {
- // Metrics that are measured for processed and active element groups.
- message Measured {
- // (Required) Map from local input name to number of elements processed
- // from this input.
- map<string, int64> input_element_counts = 1;
-
- // (Required) Map from local output name to number of elements produced
- // for this output.
- map<string, int64> output_element_counts = 2;
-
- // (Optional) The total time spent so far in processing the elements in
- // this group.
- int64 total_time_spent = 3;
-
- // TODO: Add other element group level metrics.
- }
-
- // Metrics for fully processed elements.
- message ProcessedElements {
- // (Required)
- Measured measured = 1;
- }
-
- // Metrics for active elements.
- // An element is considered active if the SDK has started but not finished
- // processing it yet.
- message ActiveElements {
- // (Required)
- Measured measured = 1;
-
- // Estimated metrics.
-
- // (Optional) Sum of estimated fraction of known work remaining for all
- // active elements, as reported by this transform.
- // If not reported, a Runner could extrapolate this from the processed
- // elements.
- // TODO: Handle the case when known work is infinite.
- double fraction_remaining = 2;
-
- // (Optional) Map from local output name to sum of estimated number
- // of elements remaining for this output from all active elements,
- // as reported by this transform.
- // If not reported, a Runner could extrapolate this from the processed
- // elements.
- map<string, int64> output_elements_remaining = 3;
- }
-
- // (Required): Metrics for processed elements.
- ProcessedElements processed_elements = 1;
- // (Required): Metrics for active elements.
- ActiveElements active_elements = 2;
-
- // (Optional): Map from local output name to its watermark.
- // The watermarks reported are tentative, to get a better sense of progress
- // while processing a bundle but before it is committed. At bundle commit
- // time, a Runner needs to also take into account the timers set to compute
- // the actual watermarks.
- map<string, int64> watermarks = 3;
-
- // TODO: Define other transform level system metrics.
- }
-
- // User defined metrics
- message User {
- // TODO: Define it.
- }
-
- map<string, PTransform> ptransforms = 1;
- map<string, User> user = 2;
-}
-
-message ProcessBundleProgressResponse {
- // (Required)
- Metrics metrics = 1;
-}
-
-message ProcessBundleSplitRequest {
- // (Required) A reference to an active process bundle request with the given
- // instruction id.
- string instruction_reference = 1;
-
- // (Required) The fraction of work (when compared to the known amount of work)
- // the process bundle request should try to split at.
- double fraction = 2;
-}
-
-// urn:org.apache.beam:restriction:element-count:1.0
-message ElementCountRestriction {
- // A restriction representing the number of elements that should be processed.
- // Effectively the range [0, count]
- int64 count = 1;
-}
-
-// urn:org.apache.beam:restriction:element-count-skip:1.0
-message ElementCountSkipRestriction {
- // A restriction representing the number of elements that should be skipped.
- // Effectively the range (count, infinity]
- int64 count = 1;
-}
-
-// Each primitive transform that is splittable is defined by a restriction
-// it is currently processing. During splitting, that currently active
-// restriction (R_initial) is split into 2 components:
-// * a restriction (R_done) representing all elements that will be fully
-// processed
-// * a restriction (R_todo) representing all elements that will not be fully
-// processed
-//
-// where:
-// R_initial = R_done ⋃ R_todo
-message PrimitiveTransformSplit {
- // (Required) A reference to a primitive transform with the given id that
- // is part of the active process bundle request with the given instruction
- // id.
- string primitive_transform_reference = 1;
-
- // (Required) A function specification describing the restriction
- // that has been completed by the primitive transform.
- //
- // For example, a remote GRPC source will have a specific urn and data
- // block containing an ElementCountRestriction.
- org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
-
- // (Required) A function specification describing the restriction
- // representing the remainder of work for the primitive transform.
- //
- // FOr example, a remote GRPC source will have a specific urn and data
- // block contain an ElemntCountSkipRestriction.
- org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
-}
-
-message ProcessBundleSplitResponse {
- // (Optional) A set of split responses for a currently active work item.
- //
- // If primitive transform B is a descendant of primitive transform A and both
- // A and B report a split. Then B's restriction is reported as an element
- // restriction pair and thus the fully reported restriction is:
- // R = A_done
- // ⋃ (A_boundary ⋂ B_done)
- // ⋃ (A_boundary ⋂ B_todo)
- // ⋃ A_todo
- // If there is a decendant of B named C, then C would similarly report a
- // set of element pair restrictions.
- //
- // This restriction is processed and completed by the currently active process
- // bundle request:
- // A_done ⋃ (A_boundary ⋂ B_done)
- // and these restrictions will be processed by future process bundle requests:
- // A_boundary ⋂ B_todo (passed to SDF B directly)
- // A_todo (passed to SDF A directly)
-
- // If primitive transform B and C are siblings and descendants of A and A, B,
- // and C report a split. Then B and C's restrictions are relative to A's.
- // R = A_done
- // ⋃ (A_boundary ⋂ B_done)
- // ⋃ (A_boundary ⋂ B_todo)
- // ⋃ (A_boundary ⋂ B_todo)
- // ⋃ (A_boundary ⋂ C_todo)
- // ⋃ A_todo
- // If there is no descendant of B or C also reporting a split, than
- // B_boundary = ∅ and C_boundary = ∅
- //
- // This restriction is processed and completed by the currently active process
- // bundle request:
- // A_done ⋃ (A_boundary ⋂ B_done)
- // ⋃ (A_boundary ⋂ C_done)
- // and these restrictions will be processed by future process bundle requests:
- // A_boundary ⋂ B_todo (passed to SDF B directly)
- // A_boundary ⋂ C_todo (passed to SDF C directly)
- // A_todo (passed to SDF A directly)
- //
- // Note that descendants splits should only be reported if it is inexpensive
- // to compute the boundary restriction intersected with descendants splits.
- // Also note, that the boundary restriction may represent a set of elements
- // produced by a parent primitive transform which can not be split at each
- // element or that there are intermediate unsplittable primitive transforms
- // between an ancestor splittable function and a descendant splittable
- // function which may have more than one output per element. Finally note
- // that the descendant splits should only be reported if the split
- // information is relatively compact.
- repeated PrimitiveTransformSplit splits = 1;
-}
-
-/*
- * Data Plane API
- */
-
-// Messages used to represent logical byte streams.
-// Stable
-message Elements {
- // Represents multiple encoded elements in nested context for a given named
- // instruction and target.
- message Data {
- // (Required) A reference to an active instruction request with the given
- // instruction id.
- string instruction_reference = 1;
-
- // (Required) A definition representing a consumer or producer of this data.
- // If received by a harness, this represents the consumer within that
- // harness that should consume these bytes. If sent by a harness, this
- // represents the producer of these bytes.
- //
- // Note that a single element may span multiple Data messages.
- //
- // Note that a sending/receiving pair should share the same target
- // identifier.
- Target target = 2;
-
- // (Optional) Represents a part of a logical byte stream. Elements within
- // the logical byte stream are encoded in the nested context and
- // concatenated together.
- //
- // An empty data block represents the end of stream for the given
- // instruction and target.
- bytes data = 3;
- }
-
- // (Required) A list containing parts of logical byte streams.
- repeated Data data = 1;
-}
-
-// Stable
-service BeamFnData {
- // Used to send data between harnesses.
- rpc Data(
- // A stream of data representing input.
- stream Elements
- ) returns (
- // A stream of data representing output.
- stream Elements
- ) {}
-}
-
-/*
- * State API
- */
-
-message StateRequest {
- // (Required) An unique identifier provided by the SDK which represents this
- // requests execution. The StateResponse corresponding with this request
- // will have the matching id.
- string id = 1;
-
- // (Required) The associated instruction id of the work that is currently
- // being processed. This allows for the runner to associate any modifications
- // to state to be committed with the appropriate work execution.
- string instruction_reference = 2;
-
- // (Required) The state key this request is for.
- StateKey state_key = 3;
-
- // (Required) The action to take on this request.
- oneof request {
- // A request to get state.
- StateGetRequest get = 1000;
-
- // A request to append to state.
- StateAppendRequest append = 1001;
-
- // A request to clear state.
- StateClearRequest clear = 1002;
- }
-}
-
-message StateResponse {
- // (Required) A reference provided by the SDK which represents a requests
- // execution. The StateResponse must have the matching id when responding
- // to the SDK.
- string id = 1;
-
- // (Optional) If this is specified, then the state request has failed.
- // A human readable string representing the reason as to why the request
- // failed.
- string error = 2;
-
- // (Optional) If this is specified, then the result of this state request
- // can be cached using the supplied token.
- bytes cache_token = 3;
-
- // A corresponding response matching the request will be populated.
- oneof response {
- // A response to getting state.
- StateGetResponse get = 1000;
-
- // A response to appending to state.
- StateAppendResponse append = 1001;
-
- // A response to clearing state.
- StateClearResponse clear = 1002;
- }
-}
-
-service BeamFnState {
- // Used to get/append/clear state stored by the runner on behalf of the SDK.
- rpc State(
- // A stream of state instructions requested of the runner.
- stream StateRequest
- ) returns (
- // A stream of responses to state instructions the runner was asked to be
- // performed.
- stream StateResponse
- ) {}
-}
-
-message StateKey {
- message Runner {
- // (Required) Opaque information supplied by the runner. Used to support
- // remote references.
- bytes key = 1;
- }
-
- message MultimapSideInput {
- // (Required) The id of the PTransform containing a side input.
- string ptransform_id = 1;
- // (Required) The id of the side input.
- string side_input_id = 2;
- // (Required) The window (after mapping the currently executing elements
- // window into the side input windows domain) encoded in a nested context.
- bytes window = 3;
- // (Required) The key encoded in a nested context.
- bytes key = 4;
- }
-
- message BagUserState {
- // (Required) The id of the PTransform containing user state.
- string ptransform_id = 1;
- // (Required) The id of the user state.
- string user_state_id = 2;
- // (Required) The window encoded in a nested context.
- bytes window = 3;
- // (Required) The key of the currently executing element encoded in a
- // nested context.
- bytes key = 4;
- }
-
- // (Required) One of the following state keys must be set.
- oneof type {
- Runner runner = 1;
- MultimapSideInput multimap_side_input = 2;
- BagUserState bag_user_state = 3;
- // TODO: represent a state key for user map state
- }
-}
-
-// A request to get state.
-message StateGetRequest {
- // (Optional) If specified, signals to the runner that the response
- // should resume from the following continuation token.
- //
- // If unspecified, signals to the runner that the response should start
- // from the beginning of the logical continuable stream.
- bytes continuation_token = 1;
-}
-
-// A response to get state representing a logical byte stream which can be
-// continued using the state API.
-message StateGetResponse {
- // (Optional) If specified, represents a token which can be used with the
- // state API to get the next chunk of this logical byte stream. The end of
- // the logical byte stream is signalled by this field being unset.
- bytes continuation_token = 1;
-
- // Represents a part of a logical byte stream. Elements within
- // the logical byte stream are encoded in the nested context and
- // concatenated together.
- bytes data = 2;
-}
-
-// A request to append state.
-message StateAppendRequest {
- // Represents a part of a logical byte stream. Elements within
- // the logical byte stream are encoded in the nested context and
- // multiple append requests are concatenated together.
- bytes data = 1;
-}
-
-// A response to append state.
-message StateAppendResponse {
-}
-
-// A request to clear state.
-message StateClearRequest {
-}
-
-// A response to clear state.
-message StateClearResponse {
-}
-
-/*
- * Logging API
- *
- * This is very stable. There can be some changes to how we define a LogEntry,
- * to increase/decrease the severity types, the way we format an exception/stack
- * trace, or the log site.
- */
-
-// A log entry
-message LogEntry {
- // A list of log entries, enables buffering and batching of multiple
- // log messages using the logging API.
- message List {
- // (Required) One or or more log messages.
- repeated LogEntry log_entries = 1;
- }
-
- // The severity of the event described in a log entry, expressed as one of the
- // severity levels listed below. For your reference, the levels are
- // assigned the listed numeric values. The effect of using numeric values
- // other than those listed is undefined.
- //
- // If you are writing log entries, you should map other severity encodings to
- // one of these standard levels. For example, you might map all of
- // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
- //
- // This list is intentionally not comprehensive; the intent is to provide a
- // common set of "good enough" severity levels so that logging front ends
- // can provide filtering and searching across log types. Users of the API are
- // free not to use all severity levels in their log messages.
- message Severity {
- enum Enum {
- UNSPECIFIED = 0;
- // Trace level information, also the default log level unless
- // another severity is specified.
- TRACE = 1;
- // Debugging information.
- DEBUG = 2;
- // Normal events.
- INFO = 3;
- // Normal but significant events, such as start up, shut down, or
- // configuration.
- NOTICE = 4;
- // Warning events might cause problems.
- WARN = 5;
- // Error events are likely to cause problems.
- ERROR = 6;
- // Critical events cause severe problems or brief outages and may
- // indicate that a person must take action.
- CRITICAL = 7;
- }
- }
-
- // (Required) The severity of the log statement.
- Severity.Enum severity = 1;
-
- // (Required) The time at which this log statement occurred.
- google.protobuf.Timestamp timestamp = 2;
-
- // (Required) A human readable message.
- string message = 3;
-
- // (Optional) An optional trace of the functions involved. For example, in
- // Java this can include multiple causes and multiple suppressed exceptions.
- string trace = 4;
-
- // (Optional) A reference to the instruction this log statement is associated
- // with.
- string instruction_reference = 5;
-
- // (Optional) A reference to the primitive transform this log statement is
- // associated with.
- string primitive_transform_reference = 6;
-
- // (Optional) Human-readable name of the function or method being invoked,
- // with optional context such as the class or package name. The format can
- // vary by language. For example:
- // qual.if.ied.Class.method (Java)
- // dir/package.func (Go)
- // module.function (Python)
- // file.cc:382 (C++)
- string log_location = 7;
-
- // (Optional) The name of the thread this log statement is associated with.
- string thread = 8;
-}
-
-message LogControl {
-}
-
-// Stable
-service BeamFnLogging {
- // Allows for the SDK to emit log entries which the runner can
- // associate with the active job.
- rpc Logging(
- // A stream of log entries batched into lists emitted by the SDK harness.
- stream LogEntry.List
- ) returns (
- // A stream of log control messages used to configure the SDK.
- stream LogControl
- ) {}
-}
-
-/*
- * Environment types
- */
-// A Docker container configuration for launching the SDK harness to execute
-// user specified functions.
-message DockerContainer {
- // (Required) A pipeline level unique id which can be used as a reference to
- // refer to this.
- string id = 1;
-
- // (Required) The Docker container URI
- // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
- string uri = 2;
-
- // (Optional) Docker registry specification.
- // If unspecified, the uri is expected to be able to be fetched without
- // requiring additional configuration by a runner.
- string registry_reference = 3;
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
deleted file mode 100644
index b0cd6b4..0000000
--- a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Provision API, for communicating with a runner
- * for job and environment provisioning information over GRPC.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.fn.v1;
-
-option java_package = "org.apache.beam.fn.v1";
-option java_outer_classname = "ProvisionApi";
-
-import "google/protobuf/struct.proto";
-
-// A service to provide runtime provisioning information to the SDK harness
-// worker instances -- such as pipeline options, resource constraints and
-// other job metadata -- needed by an SDK harness instance to initialize.
-service ProvisionService {
- // Get provision information for the SDK harness worker instance.
- rpc GetProvisionInfo(GetProvisionInfoRequest) returns (GetProvisionInfoResponse);
-}
-
-// A request to get the provision info of a SDK harness worker instance.
-message GetProvisionInfoRequest { }
-
-// A response containing the provision info of a SDK harness worker instance.
-message GetProvisionInfoResponse {
- ProvisionInfo info = 1;
-}
-
-// Runtime provisioning information for a SDK harness worker instance,
-// such as pipeline options, resource constraints and other job metadata
-message ProvisionInfo {
- // (required) The job ID.
- string job_id = 1;
- // (required) The job name.
- string job_name = 2;
-
- // (required) Pipeline options. For non-template jobs, the options are
- // identical to what is passed to job submission.
- google.protobuf.Struct pipeline_options = 3;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
deleted file mode 100644
index f37b2d3..0000000
--- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
+++ /dev/null
@@ -1,195 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This file is broken into multiple sections delimited by ---. Each section specifies a set of
-# reference encodings for a single standardized coder used in a specific context.
-#
-# Each section contains up to 3 properties:
-#
-# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
-# nested: a boolean meaning whether the coder was used in the nested context. Missing means to
-# test both contexts, a shorthand for when the coder is invariant across context.
-# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
-# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
-# one of a few standard JSON types such as numbers, strings, dicts that map naturally
-# to the type encoded by the coder.
-#
-# These choices were made to strike a balance between portability, ease of use, and simple
-# legibility of this file itself.
-#
-# It is expected that future work will move the `coder` field into a format that it would be
-# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
-#
-# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
-
-
-coder:
- urn: "urn:beam:coders:bytes:0.1"
-nested: false
-examples:
- "abc": abc
- "ab\0c": "ab\0c"
-
----
-
-coder:
- urn: "urn:beam:coders:bytes:0.1"
-nested: true
-examples:
- "\u0003abc": abc
- "\u0004ab\0c": "ab\0c"
- "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|":
- " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|"
-
----
-
-coder:
- urn: "urn:beam:coders:varint:0.1"
-examples:
- "\0": 0
- "\u0001": 1
- "\u000A": 10
- "\u00c8\u0001": 200
- "\u00e8\u0007": 1000
- "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
-
----
-
-coder:
- urn: "urn:beam:coders:kv:0.1"
- components: [{urn: "urn:beam:coders:bytes:0.1"},
- {urn: "urn:beam:coders:varint:0.1"}]
-examples:
- "\u0003abc\0": {key: abc, value: 0}
- "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
-
----
-
-coder:
- urn: "urn:beam:coders:kv:0.1"
- components: [{urn: "urn:beam:coders:bytes:0.1"},
- {urn: "urn:beam:coders:bytes:0.1"}]
-nested: false
-examples:
- "\u0003abcdef": {key: abc, value: def}
- "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
- urn: "urn:beam:coders:kv:0.1"
- components: [{urn: "urn:beam:coders:bytes:0.1"},
- {urn: "urn:beam:coders:bytes:0.1"}]
-nested: true
-examples:
- "\u0003abc\u0003def": {key: abc, value: def}
- "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
- urn: "urn:beam:coders:interval_window:0.1"
-examples:
- "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
- "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
- "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
- "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
-
----
-
-coder:
- urn: "urn:beam:coders:stream:0.1"
- components: [{urn: "urn:beam:coders:varint:0.1"}]
-examples:
- "\0\0\0\u0001\0": [0]
- "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
- "\0\0\0\0": []
-
----
-
-coder:
- urn: "urn:beam:coders:stream:0.1"
- components: [{urn: "urn:beam:coders:bytes:0.1"}]
-examples:
- "\0\0\0\u0001\u0003abc": ["abc"]
- "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
- "\0\0\0\0": []
-
----
-
-coder:
- urn: "urn:beam:coders:stream:0.1"
- components: [{urn: "urn:beam:coders:bytes:0.1"}]
- # This is for iterables of unknown length, where the encoding is not
- # deterministic.
- non_deterministic: True
-examples:
- "\u00ff\u00ff\u00ff\u00ff\u0000": []
- "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
- "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
-
----
-
-coder:
- urn: "urn:beam:coders:stream:0.1"
- components: [{urn: "urn:beam:coders:global_window:0.1"}]
-examples:
- "\0\0\0\u0001": [""]
-
----
-
-coder:
- urn: "urn:beam:coders:global_window:0.1"
-examples:
- "": ""
-
----
-
-# All windowed values consist of pane infos that represent NO_FIRING until full support is added
-# in the Python SDK (BEAM-1522).
-coder:
- urn: "urn:beam:coders:windowed_value:0.1"
- components: [{urn: "urn:beam:coders:varint:0.1"},
- {urn: "urn:beam:coders:global_window:0.1"}]
-examples:
- "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
- value: 2,
- timestamp: 1454293425000,
- pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
- windows: ["global"]
- }
-
----
-
-coder:
- urn: "urn:beam:coders:windowed_value:0.1"
- components: [{urn: "urn:beam:coders:varint:0.1"},
- {urn: "urn:beam:coders:interval_window:0.1"}]
-examples:
- "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
- value: 4,
- timestamp: -400000,
- pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
- windows: [{end: 1454293425000, span: 280000}]
- }
-
- "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
- value: 2,
- timestamp: -100,
- pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
- windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
- }
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index d950759..d4ecad0 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -33,6 +33,5 @@
<name>Apache Beam :: SDKs :: Common</name>
<modules>
- <module>fn-api</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 36271eb..6c74cd9 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -302,7 +302,7 @@
<!-- test dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
+ <artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
http://git-wip-us.apache.org/repos/asf/beam/blob/e233af9a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 7f9156c..de24f7f 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -51,10 +51,10 @@
<include>com.google.guava:guava</include>
<!-- java harness dependencies that are not staged -->
<include>org.apache.beam:beam-model-pipeline</include>
+ <include>org.apache.beam:beam-model-fn-execution</include>
<include>org.apache.beam:beam-runners-core-construction-java</include>
<include>org.apache.beam:beam-runners-core-java</include>
<include>org.apache.beam:beam-runners-google-cloud-dataflow-java</include>
- <include>org.apache.beam:beam-sdks-common-fn-api</include>
<include>io.netty:netty-transport-native-epoll</include>
</includes>
</artifactSet>
@@ -131,6 +131,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-model-fn-execution</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
@@ -164,11 +169,6 @@
</dependency>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>