You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:56 UTC
[5/6] beam git commit: A proposal for a portability framework to
execute user definable functions.
A proposal for a portability framework to execute user definable functions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4b2bec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4b2bec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4b2bec
Branch: refs/heads/master
Commit: 0b4b2becb45b9f637ba31f599ebe8be0331bd633
Parents: 582c4a8
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jan 19 15:16:55 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jan 30 12:47:55 2017 -0800
----------------------------------------------------------------------
pom.xml | 36 +-
runners/apex/pom.xml | 2 +-
sdks/common/fn-api/pom.xml | 111 +++
.../fn-api/src/main/proto/beam_fn_api.proto | 771 +++++++++++++++++++
sdks/common/pom.xml | 38 +
.../src/main/resources/beam/findbugs-filter.xml | 32 +-
sdks/java/harness/pom.xml | 167 ++++
.../org/apache/beam/fn/harness/FnHarness.java | 131 ++++
.../harness/channel/ManagedChannelFactory.java | 80 ++
.../harness/channel/SocketAddressFactory.java | 64 ++
.../beam/fn/harness/channel/package-info.java | 22 +
.../fn/harness/control/BeamFnControlClient.java | 165 ++++
.../harness/control/ProcessBundleHandler.java | 334 ++++++++
.../fn/harness/control/RegisterHandler.java | 92 +++
.../beam/fn/harness/control/package-info.java | 22 +
.../BeamFnDataBufferingOutboundObserver.java | 135 ++++
.../beam/fn/harness/data/BeamFnDataClient.java | 64 ++
.../fn/harness/data/BeamFnDataGrpcClient.java | 122 +++
.../harness/data/BeamFnDataGrpcMultiplexer.java | 140 ++++
.../harness/data/BeamFnDataInboundObserver.java | 81 ++
.../beam/fn/harness/data/package-info.java | 22 +
.../fn/harness/fake/FakeAggregatorFactory.java | 52 ++
.../beam/fn/harness/fake/FakeStepContext.java | 70 ++
.../beam/fn/harness/fake/package-info.java | 22 +
.../harness/fn/CloseableThrowingConsumer.java | 23 +
.../beam/fn/harness/fn/ThrowingBiFunction.java | 32 +
.../beam/fn/harness/fn/ThrowingConsumer.java | 32 +
.../beam/fn/harness/fn/ThrowingFunction.java | 32 +
.../beam/fn/harness/fn/ThrowingRunnable.java | 30 +
.../apache/beam/fn/harness/fn/package-info.java | 22 +
.../fn/harness/logging/BeamFnLoggingClient.java | 308 ++++++++
.../beam/fn/harness/logging/package-info.java | 22 +
.../apache/beam/fn/harness/package-info.java | 22 +
.../beam/fn/harness/stream/AdvancingPhaser.java | 36 +
.../harness/stream/BufferingStreamObserver.java | 166 ++++
.../fn/harness/stream/DirectStreamObserver.java | 71 ++
.../ForwardingClientResponseObserver.java | 63 ++
.../harness/stream/StreamObserverFactory.java | 91 +++
.../beam/fn/harness/stream/package-info.java | 22 +
.../beam/runners/core/BeamFnDataReadRunner.java | 104 +++
.../runners/core/BeamFnDataWriteRunner.java | 87 +++
.../beam/runners/core/BoundedSourceRunner.java | 105 +++
.../apache/beam/runners/core/package-info.java | 22 +
.../apache/beam/fn/harness/FnHarnessTest.java | 130 ++++
.../channel/ManagedChannelFactoryTest.java | 74 ++
.../channel/SocketAddressFactoryTest.java | 56 ++
.../control/BeamFnControlClientTest.java | 182 +++++
.../control/ProcessBundleHandlerTest.java | 674 ++++++++++++++++
.../fn/harness/control/RegisterHandlerTest.java | 80 ++
...BeamFnDataBufferingOutboundObserverTest.java | 142 ++++
.../harness/data/BeamFnDataGrpcClientTest.java | 309 ++++++++
.../data/BeamFnDataGrpcMultiplexerTest.java | 96 +++
.../data/BeamFnDataInboundObserverTest.java | 116 +++
.../logging/BeamFnLoggingClientTest.java | 169 ++++
.../fn/harness/stream/AdvancingPhaserTest.java | 48 ++
.../stream/BufferingStreamObserverTest.java | 146 ++++
.../stream/DirectStreamObserverTest.java | 139 ++++
.../ForwardingClientResponseObserverTest.java | 60 ++
.../stream/StreamObserverFactoryTest.java | 84 ++
.../beam/fn/harness/test/TestExecutors.java | 85 ++
.../beam/fn/harness/test/TestExecutorsTest.java | 160 ++++
.../beam/fn/harness/test/TestStreams.java | 162 ++++
.../beam/fn/harness/test/TestStreamsTest.java | 84 ++
.../runners/core/BeamFnDataReadRunnerTest.java | 187 +++++
.../runners/core/BeamFnDataWriteRunnerTest.java | 155 ++++
.../runners/core/BoundedSourceRunnerTest.java | 113 +++
sdks/java/pom.xml | 1 +
sdks/pom.xml | 1 +
68 files changed, 7514 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d09bf59..a53453b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<google-clients.version>1.22.0</google-clients.version>
<google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version>
<google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
- <guava.version>19.0</guava.version>
+ <guava.version>20.0</guava.version>
<grpc.version>1.0.1</grpc.version>
<hamcrest.version>1.3</hamcrest.version>
<jackson.version>2.7.2</jackson.version>
@@ -127,7 +127,7 @@
<mockito.version>1.9.5</mockito.version>
<netty.version>4.1.3.Final</netty.version>
<os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>
- <protobuf.version>3.0.0</protobuf.version>
+ <protobuf.version>3.1.0</protobuf.version>
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
<slf4j.version>1.7.14</slf4j.version>
<stax2.version>3.1.4</stax2.version>
@@ -314,6 +314,11 @@
<dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-fn-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -729,6 +734,13 @@
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <version>${netty.version}</version>
+ <classifier>linux-x86_64</classifier>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
@@ -741,6 +753,12 @@
</dependency>
<dependency>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ <version>2.0.13</version>
+ </dependency>
+
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
@@ -824,9 +842,23 @@
</dependencyManagement>
<build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ </extension>
+ </extensions>
+
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ </plugin>
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 7ae07e2..5e16083 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -226,7 +226,7 @@
<ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
new file mode 100644
index 0000000..72788d0
--- /dev/null
+++ b/sdks/common/fn-api/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <packaging>jar</packaging>
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-parent</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-common-fn-api</artifactId>
+ <name>Apache Beam :: SDKs :: Common :: Fn API</name>
+ <description>This artifact generates the stub bindings.</description>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>${project.build.directory}/original_sources_to_package</directory>
+ </resource>
+ </resources>
+
+ <plugins>
+ <!-- Skip the checkstyle plugin on generated code -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <!-- Skip the findbugs plugin on generated code -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
new file mode 100644
index 0000000..3ac0fbf
--- /dev/null
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Fn API and boostrapping.
+ *
+ * TODO: Usage of plural names in lists looks awkward in Java
+ * e.g. getOutputsMap, addCodersBuilder
+ *
+ * TODO: gRPC / proto field names conflict with generated code
+ * e.g. "class" in java, "output" in python
+ */
+
+syntax = "proto3";
+
+/* TODO: Consider consolidating common components in another package
+ * and lanaguage namespaces for re-use with Runner Api.
+ */
+
+package org.apache.beam.fn.v1;
+
+option java_package = "org.apache.beam.fn.v1";
+option java_outer_classname = "BeamFnApi";
+
+import "google/protobuf/any.proto";
+import "google/protobuf/timestamp.proto";
+
+/*
+ * Constructs that define the pipeline shape.
+ *
+ * These are mostly unstable due to the missing pieces to be shared with
+ * the Runner Api like windowing strategy, display data, .... There are still
+ * some modelling questions related to whether a side input is modelled
+ * as another field on a PrimitiveTransform or as part of inputs and we
+ * still are missing things like the CompositeTransform.
+ */
+
+// A representation of an input or output definition on a primitive transform.
+// Stable
+message Target {
+ // A repeated list of target definitions.
+ message List {
+ repeated Target target = 1;
+ }
+
+ // (Required) The id of the PrimitiveTransform which is the target.
+ int64 primitive_transform_reference = 1;
+
+ // (Required) The local name of an input or output defined on the primitive
+ // transform.
+ string name = 2;
+}
+
+// Information defining a PCollection
+message PCollection {
+ // (Required) A reference to a coder.
+ int64 coder_reference = 1;
+
+ // TODO: Windowing strategy, ...
+}
+
+// A primitive transform within Apache Beam.
+message PrimitiveTransform {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ int64 id = 1;
+
+ // (Required) A function spec that is used by this primitive
+ // transform to process data.
+ FunctionSpec function_spec = 2;
+
+ // A map of distinct input names to target definitions.
+ // For example, in CoGbk this represents the tag name associated with each
+ // distinct input name and a list of primitive transforms that are associated
+ // with the specified input.
+ map<string, Target.List> inputs = 3;
+
+ // A map from local output name to PCollection definitions. For example, in
+ // DoFn this represents the tag name associated with each distinct output.
+ map<string, PCollection> outputs = 4;
+
+ // TODO: Should we model side inputs as a special type of input for a
+ // primitive transform or should it be modeled as the relationship that
+ // the predecessor input will be a view primitive transform.
+ // A map of from side input names to side inputs.
+ map<string, SideInput> side_inputs = 5;
+
+ // The user name of this step.
+ // TODO: This should really be in display data and not at this level
+ string step_name = 6;
+}
+
+/*
+ * User Definable Functions
+ *
+ * This is still unstable mainly due to how we model the side input.
+ */
+
+// Defines the common elements of user-definable functions, to allow the SDK to
+// express the information the runner needs to execute work.
+// Stable
+message FunctionSpec {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ int64 id = 1;
+
+ // (Required) A globally unique name representing this user definable
+ // function.
+ //
+ // User definable functions use the urn encodings registered such that another
+ // may implement the user definable function within another language.
+ //
+ // For example:
+ // urn:org.apache.beam:coder:kv:1.0
+ string urn = 2;
+
+ // (Required) Reference to specification of execution environment required to
+ // invoke this function.
+ int64 environment_reference = 3;
+
+ // Data used to parameterize this function. Depending on the urn, this may be
+ // optional or required.
+ google.protobuf.Any data = 4;
+}
+
+message SideInput {
+ // TODO: Coder?
+
+ // For RunnerAPI.
+ Target input = 1;
+
+ // For FnAPI.
+ FunctionSpec view_fn = 2;
+}
+
+// Defines how to encode values into byte streams and decode values from byte
+// streams. A coder can be parameterized by additional properties which may or
+// may not be language agnostic.
+//
+// Coders using the urn:org.apache.beam:coder namespace must have their
+// encodings registered such that another may implement the encoding within
+// another language.
+//
+// For example:
+// urn:org.apache.beam:coder:kv:1.0
+// urn:org.apache.beam:coder:iterable:1.0
+// Stable
+message Coder {
+ // TODO: This looks weird when compared to the other function specs
+ // which use URN to differentiate themselves. Should "Coder" be embedded
+ // inside the FunctionSpec data block.
+
+ // The data associated with this coder used to reconstruct it.
+ FunctionSpec function_spec = 1;
+
+ // A list of component coder references.
+ //
+ // For a key-value coder, there must be exactly two component coder references
+ // where the first reference represents the key coder and the second reference
+ // is the value coder.
+ //
+ // For an iterable coder, there must be exactly one component coder reference
+ // representing the value coder.
+ //
+ // TODO: Perhaps this is redundant with the data of the FunctionSpec
+ // for known coders?
+ repeated int64 component_coder_reference = 2;
+}
+
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+message RemoteGrpcPort {
+ // (Required) An API descriptor which describes where to
+ // connect to including any authentication that is required.
+ ApiServiceDescriptor api_service_descriptor = 1;
+}
+
+/*
+ * Control Plane API
+ *
+ * Progress reporting and splitting still need further vetting. Also, this may change
+ * with the addition of new types of instructions/responses related to metrics.
+ */
+
+// An API that describes the work that a SDK Fn Harness is meant to do.
+// Stable
+service BeamFnControl {
+ // Instructions sent by the runner to the SDK requesting different types
+ // of work.
+ rpc Control(
+ // A stream of responses to instructions the SDK was asked to be performed.
+ stream InstructionResponse
+ ) returns (
+ // A stream of instructions requested of the SDK to be performed.
+ stream InstructionRequest
+ ) {}
+}
+
+// A request sent by a runner which it the SDK is asked to fulfill.
+// Stable
+message InstructionRequest {
+ // (Required) An unique identifier provided by the runner which represents
+ // this requests execution. The InstructionResponse MUST have the matching id.
+ int64 instruction_id = 1;
+
+ // (Required) A request that the SDK Harness needs to interpret.
+ oneof request {
+ RegisterRequest register = 1000;
+ ProcessBundleRequest process_bundle = 1001;
+ ProcessBundleProgressRequest process_bundle_progress = 1002;
+ ProcessBundleSplitRequest process_bundle_split = 1003;
+ }
+}
+
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+message InstructionResponse {
+ // (Required) A reference provided by the runner which represents a requests
+ // execution. The InstructionResponse MUST have the matching id when
+ // responding to the runner.
+ int64 instruction_id = 1;
+
+ // If this is specified, then this instruction has failed.
+ // A human readable string representing the reason as to why processing has
+ // failed.
+ string error = 2;
+
+ // If the instruction did not fail, it is required to return an equivalent
+ // response type depending on the request this matches.
+ oneof response {
+ RegisterResponse register = 1000;
+ ProcessBundleResponse process_bundle = 1001;
+ ProcessBundleProgressResponse process_bundle_progress = 1002;
+ ProcessBundleSplitResponse process_bundle_split = 1003;
+ }
+}
+
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+message RegisterRequest {
+ // (Optional) The set of descriptors used to process bundles.
+ repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
+}
+
+// Stable
+message RegisterResponse {
+}
+
+// A descriptor of references used when processing a bundle.
+// Stable
+message ProcessBundleDescriptor {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ int64 id = 1;
+
+ // (Required) A list of primitive transforms that should
+ // be used to construct the bundle processing graph.
+ repeated PrimitiveTransform primitive_transform = 2;
+
+ // (Required) The set of all coders referenced in this bundle.
+ repeated Coder coders = 4;
+}
+
+// A request to process a given bundle.
+// Stable
+message ProcessBundleRequest {
+ int64 process_bundle_descriptor_reference = 1;
+}
+
+// Stable
+message ProcessBundleResponse {
+}
+
+message ProcessBundleProgressRequest {
+ // (Required) A reference to an active process bundle request with the given
+ // instruction id.
+ int64 instruction_reference = 1;
+}
+
+message ProcessBundleProgressResponse {
+ // (Required) The finished amount of work. A monotonically increasing
+ // unitless measure of work finished.
+ double finished_work = 1;
+
+ // (Required) The known amount of backlog for the process bundle request.
+ // Computed as:
+ // (estimated known work - finish work) / finished work
+ double backlog = 2;
+}
+
+message ProcessBundleSplitRequest {
+ // (Required) A reference to an active process bundle request with the given
+ // instruction id.
+ int64 instruction_reference = 1;
+
+ // (Required) The fraction of work (when compared to the known amount of work)
+ // the process bundle request should try to split at.
+ double fraction = 2;
+}
+
+// urn:org.apache.beam:restriction:element-count:1.0
+message ElementCountRestriction {
+ // A restriction representing the number of elements that should be processed.
+ // Effectively the range [0, count]
+ int64 count = 1;
+}
+
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+message ElementCountSkipRestriction {
+ // A restriction representing the number of elements that should be skipped.
+ // Effectively the range (count, infinity]
+ int64 count = 1;
+}
+
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+// * a restriction (R_done) representing all elements that will be fully
+// processed
+// * a restriction (R_todo) representing all elements that will not be fully
+// processed
+//
+// where:
+// R_initial = R_done \u22c3 R_todo
+message PrimitiveTransformSplit {
+ // (Required) A reference to a primitive transform with the given id that
+ // is part of the active process bundle request with the given instruction
+ // id.
+ int64 primitive_transform_reference = 1;
+
+ // (Required) A function specification describing the restriction
+ // that has been completed by the primitive transform.
+ //
+ // For example, a remote GRPC source will have a specific urn and data
+ // block containing an ElementCountRestriction.
+ FunctionSpec completed_restriction = 2;
+
+ // (Required) A function specification describing the restriction
+ // representing the remainder of work for the primitive transform.
+ //
+ // FOr example, a remote GRPC source will have a specific urn and data
+ // block contain an ElemntCountSkipRestriction.
+ FunctionSpec remaining_restriction = 3;
+}
+
+message ProcessBundleSplitResponse {
+ // (Optional) A set of split responses for a currently active work item.
+ //
+ // If primitive transform B is a descendant of primitive transform A and both
+ // A and B report a split. Then B's restriction is reported as an element
+ // restriction pair and thus the fully reported restriction is:
+ // R = A_done
+ // \u22c3 (A_boundary \u22c2 B_done)
+ // \u22c3 (A_boundary \u22c2 B_todo)
+ // \u22c3 A_todo
+ // If there is a decendant of B named C, then C would similarly report a
+ // set of element pair restrictions.
+ //
+ // This restriction is processed and completed by the currently active process
+ // bundle request:
+ // A_done \u22c3 (A_boundary \u22c2 B_done)
+ // and these restrictions will be processed by future process bundle requests:
+ // A_boundary \u22c2 B_todo (passed to SDF B directly)
+ // A_todo (passed to SDF A directly)
+
+ // If primitive transform B and C are siblings and descendants of A and A, B,
+ // and C report a split. Then B and C's restrictions are relative to A's.
+ // R = A_done
+ // \u22c3 (A_boundary \u22c2 B_done)
+ // \u22c3 (A_boundary \u22c2 B_todo)
+ // \u22c3 (A_boundary \u22c2 B_todo)
+ // \u22c3 (A_boundary \u22c2 C_todo)
+ // \u22c3 A_todo
+ // If there is no descendant of B or C also reporting a split, than
+ // B_boundary = \u2205 and C_boundary = \u2205
+ //
+ // This restriction is processed and completed by the currently active process
+ // bundle request:
+ // A_done \u22c3 (A_boundary \u22c2 B_done)
+ // \u22c3 (A_boundary \u22c2 C_done)
+ // and these restrictions will be processed by future process bundle requests:
+ // A_boundary \u22c2 B_todo (passed to SDF B directly)
+ // A_boundary \u22c2 C_todo (passed to SDF C directly)
+ // A_todo (passed to SDF A directly)
+ //
+ // Note that descendants splits should only be reported if it is inexpensive
+ // to compute the boundary restriction intersected with descendants splits.
+ // Also note, that the boundary restriction may represent a set of elements
+ // produced by a parent primitive transform which can not be split at each
+ // element or that there are intermediate unsplittable primitive transforms
+ // between an ancestor splittable function and a descendant splittable
+ // function which may have more than one output per element. Finally note
+ // that the descendant splits should only be reported if the split
+ // information is relatively compact.
+ repeated PrimitiveTransformSplit splits = 1;
+}
+
+/*
+ * Data Plane API
+ */
+
+// Messages used to represent logical byte streams.
+// Stable
+message Elements {
+ // Represents multiple encoded elements in nested context for a given named
+ // instruction and target.
+ message Data {
+ // (Required) A reference to an active instruction request with the given
+ // instruction id.
+ int64 instruction_reference = 1;
+
+ // (Required) A definition representing a consumer or producer of this data.
+ // If received by a harness, this represents the consumer within that
+ // harness that should consume these bytes. If sent by a harness, this
+ // represents the producer of these bytes.
+ //
+ // Note that a single element may span multiple Data messages.
+ //
+ // Note that a sending/receiving pair should share the same target
+ // identifier.
+ Target target = 2;
+
+ // (Optional) Represents a part of a logical byte stream. Elements within
+ // the logical byte stream are encoded in the nested context and
+ // concatenated together.
+ //
+ // An empty data block represents the end of stream for the given
+ // instruction and target.
+ bytes data = 3;
+ }
+
+ // (Required) A list containing parts of logical byte streams.
+ repeated Data data = 1;
+}
+
+// Stable
+service BeamFnData {
+ // Used to send data between harnesses.
+ rpc Data(
+ // A stream of data representing input.
+ stream Elements
+ ) returns (
+ // A stream of data representing output.
+ stream Elements
+ ) {}
+}
+
+/*
+ * State API
+ *
+ * This is just a high level sketch of how this could work. There is still
+ * a lot of work with respect to how the key spaces for the different types
+ * of access required (side inputs, user state, ...) and how state caching
+ * works across bundles.
+ */
+
+message StateRequest {
+ // (Required) An unique identifier provided by the SDK which represents this
+ // requests execution. The StateResponse must have the matching id.
+ int64 id = 1;
+
+ // (Required) The associated instruction id of the work that is currently
+ // being processed. This allows for the runner to associate any modifications
+ // to state to be committed with the appropriate work execution.
+ int64 instruction_reference = 2;
+
+ // At least one of the following fields should be populated.
+ // Also, no request should use a state key referred to in another state key.
+
+ // (Optional) A request to get state.
+ repeated StateGetRequest get = 3;
+
+ // (Optional) A request to append to state.
+ repeated StateAppendRequest append = 4;
+
+ // (Optional) A request to clear state.
+ repeated StateClearRequest clear = 5;
+}
+
+message StateResponse {
+ // (Required) A reference provided by the SDK which represents a requests
+ // execution. The StateResponse must have the matching id when responding
+ // to the SDK.
+ int64 id = 1;
+
+ // (Required) The associated instruction id of the work that is currently
+ // being processed.
+ int64 instruction_reference = 2;
+
+ // (Required) A key to associate with the version of this state. Allows for
+ // SDKs to share state across work items if they have the same cache key and
+ // state key.
+ bytes cache_key = 3;
+
+ // (Optional) If this is specified, then the state request has failed.
+ // A human readable string representing the reason as to why the request
+ // failed.
+ string error = 4;
+
+ // For every field populated in the StateRequest, there is a matching field in
+ // the StateResponse.
+
+ // (Optional) A response to getting state.
+ repeated StateGetResponse get = 5;
+
+ // (Optional) A response to appending to state.
+ repeated StateAppendResponse append = 6;
+
+ // (Optional) A response to clearing state.
+ repeated StateClearResponse clear = 7;
+}
+
+service BeamFnState {
+ // Used to get/append/clear state stored by the runner on behalf of the SDK.
+ rpc State(
+ // A stream of state instructions requested of the runner.
+ stream StateRequest
+ ) returns (
+ // A stream of responses to state instructions the runner was asked to be
+ // performed.
+ stream StateResponse
+ ) {}
+}
+
+
+// TODO: Resolve with the other State API.
+service SimpleBeamFnState {
+ // Gets the elements associated with the given key.
+ rpc Get(StateKey) returns (Elements.Data) {}
+ // Appends elements to a given state bag.
+ rpc Append(SimpleStateAppendRequest) returns (Empty) {}
+ // Clears a given state bag.
+ rpc Clear(StateKey) returns (Empty) {}
+}
+
+message Empty {
+}
+
+message SimpleStateAppendRequest {
+ StateKey state_key = 1;
+ repeated bytes data = 2;
+}
+
+message StateKey {
+ // (Required) Represents the namespace for the state. If this state is for a
+ // DoFn, then this reference is expected to point to the DoFn. If this state
+ // is for a side input, then this is expected to reference the ViewFn.
+ int64 function_spec_reference = 1;
+
+ // (Required) The bytes of the window which this state request is for encoded
+ // in the outer context.
+ bytes window = 2;
+
+ // (Required) The user key for which the value was encoded in the outer
+ // context.
+ bytes key = 3;
+}
+
+message StateKeyOrIterable {
+ // One of the two fields below are required to be set.
+ // If state key is set, then the State API should be invoked to fetch the
+ // values allowing one to restart the iterable. Otherwise the bytes for the
+ // entire iterable are represented and should be decoded using an iterable
+ // coder using the outer context.
+ StateKey state_key = 1;
+ repeated bytes iterable = 2;
+}
+
+// A request to get state for the given state key.
+message StateGetRequest {
+ // A state key encoded in the outer context.
+ StateKey state_key = 1;
+}
+
+// A response to get state for the given state key.
+message StateGetResponse {
+ // A state key encoded in the outer context.
+ StateKey state_key = 1;
+
+ oneof state {
+ // A description of an input port which will stream the state data.
+ RemoteGrpcPort remote_grpc_port = 1000;
+ }
+}
+
+// A request to append state for the given state key.
+message StateAppendRequest {
+ // A state key encoded in the outer context.
+ StateKey state_key = 1;
+}
+
+// A response to append state for the given state key.
+message StateAppendResponse {
+ // A state key encoded in the outer context.
+ StateKey state_key = 1;
+
+ oneof state {
+ // A description of an output port which to stream the state data to.
+ RemoteGrpcPort remote_grpc_port = 1000;
+ }
+}
+
+// A request to clear state for the given state key.
+message StateClearRequest {
+ // A state key encoded in the outer context.
+ StateKey state_key = 1;
+}
+
+// A response to clear state for the given state key.
+message StateClearResponse {
+}
+
+/*
+ * Logging API
+ *
+ * This is very stable. There can be some changes to how we define a LogEntry,
+ * to increase/decrease the severity types, the way we format an exception/stack
+ * trace, or the log site.
+ */
+
+// A log entry
+message LogEntry {
+ // A list of log entries, enables buffering and batching of multiple
+ // log messages using the logging API.
+ message List {
+ // (Required) One or or more log messages.
+ repeated LogEntry log_entries = 1;
+ }
+
+ // The severity of the event described in a log entry, expressed as one of the
+ // severity levels listed below. For your reference, the levels are
+ // assigned the listed numeric values. The effect of using numeric values
+ // other than those listed is undefined.
+ //
+ // If you are writing log entries, you should map other severity encodings to
+ // one of these standard levels. For example, you might map all of
+ // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+ //
+ // This list is intentionally not comprehensive; the intent is to provide a
+ // common set of "good enough" severity levels so that logging front ends
+ // can provide filtering and searching across log types. Users of the API are
+ // free not to use all severity levels in their log messages.
+ enum Severity {
+ // Trace level information, also the default log level unless
+ // another severity is specified.
+ TRACE = 0;
+ // Debugging information.
+ DEBUG = 10;
+ // Normal events.
+ INFO = 20;
+ // Normal but significant events, such as start up, shut down, or
+ // configuration.
+ NOTICE = 30;
+ // Warning events might cause problems.
+ WARN = 40;
+ // Error events are likely to cause problems.
+ ERROR = 50;
+ // Critical events cause severe problems or brief outages and may
+ // indicate that a person must take action.
+ CRITICAL = 60;
+ }
+
+ // (Required) The severity of the log statement.
+ Severity severity = 1;
+
+ // (Required) The time at which this log statement occurred.
+ google.protobuf.Timestamp timestamp = 2;
+
+ // (Required) A human readable message.
+ string message = 3;
+
+ // (Optional) An optional trace of the functions involved. For example, in
+ // Java this can include multiple causes and multiple suppressed exceptions.
+ string trace = 4;
+
+ // (Optional) A reference to the instruction this log statement is associated
+ // with.
+ int64 instruction_reference = 5;
+
+ // (Optional) A reference to the primitive transform this log statement is
+ // associated with.
+ int64 primitive_transform_reference = 6;
+
+ // (Optional) Human-readable name of the function or method being invoked,
+ // with optional context such as the class or package name. The format can
+ // vary by language. For example:
+ // qual.if.ied.Class.method (Java)
+ // dir/package.func (Go)
+ // module.function (Python)
+ // file.cc:382 (C++)
+ string log_location = 7;
+
+ // (Optional) The name of the thread this log statement is associated with.
+ string thread = 8;
+}
+
+message LogControl {
+}
+
+// Stable
+service BeamFnLogging {
+ // Allows for the SDK to emit log entries which the runner can
+ // associate with the active job.
+ rpc Logging(
+ // A stream of log entries batched into lists emitted by the SDK harness.
+ stream LogEntry.List
+ ) returns (
+ // A stream of log control messages used to configure the SDK.
+ stream LogControl
+ ) {}
+}
+
+/*
+ * Environment types
+ */
+message ApiServiceDescriptor {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ int64 id = 1;
+
+ // (Required) The URL to connect to.
+ string url = 2;
+
+ // (Optional) The method for authentication. If unspecified, access to the
+ // url is already being performed in a trusted context (e.g. localhost,
+ // private network).
+ oneof authentication {
+ OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3;
+ }
+}
+
+message OAuth2ClientCredentialsGrant {
+ // (Required) The URL to submit a "client_credentials" grant type request for
+ // an OAuth access token which will be used as a bearer token for requests.
+ string url = 1;
+}
+
+// A Docker container configuration for launching the SDK Fn Harness to execute
+// user specified functions.
+message DockerContainer {
+ // (Required) A pipeline level unique id which can be used as a reference to
+ // refer to this.
+ int64 id = 1;
+
+ // (Required) The Docker container URI
+ // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
+ string uri = 2;
+
+ // (Optional) Docker registry specification.
+ // If unspecified, the uri is expected to be able to be fetched without
+ // requiring additional configuration by a runner.
+ int64 registry_reference = 3;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
new file mode 100644
index 0000000..8364d9a
--- /dev/null
+++ b/sdks/common/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-parent</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-common-parent</artifactId>
+
+ <packaging>pom</packaging>
+
+ <name>Apache Beam :: SDKs :: Common</name>
+
+ <modules>
+ <module>fn-api</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 35b5ed3..91ab9be 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -34,12 +34,42 @@
unapproved artifact license.
-->
<Match>
+ <Class name="org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver"/>
+ <Method name="onCompleted"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+ <!--
+ Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+ https://github.com/findbugsproject/findbugs/issues/79
+ -->
+ </Match>
+
+ <Match>
+ <Class name="org.apache.beam.fn.harness.data.BeamFnDataInboundObserver"/>
+ <Method name="accept"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+ <!--
+ Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+ https://github.com/findbugsproject/findbugs/issues/79
+ -->
+ </Match>
+
+ <Match>
+ <Class name="org.apache.beam.fn.harness.logging.BeamFnLoggingClient$LogControlObserver"/>
+ <Method name="onCompleted"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+ <!--
+ Findbugs limitation when using Void typed CompletableFuture. This is a limitation of FindBugs as discussed here:
+ https://github.com/findbugsproject/findbugs/issues/79
+ -->
+ </Match>
+
+ <Match>
<Class name="org.apache.beam.sdk.coders.AvroCoder$SerializableSchemaSupplier"/>
<Field name="schema"/>
<Bug pattern="SE_BAD_FIELD"/>
<!--
writeReplace makes this object serializable. This is a limitation of FindBugs as discussed here:
- http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
+ http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
-->
</Match>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
new file mode 100644
index 0000000..e164ee0
--- /dev/null
+++ b/sdks/java/harness/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <packaging>jar</packaging>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-parent</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-harness</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Harness</name>
+ <description>This contains the SDK Fn Harness for Beam Java</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Override Beam parent to allow Java8 -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-fn-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-lite</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <classifier>linux-x86_64</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
new file mode 100644
index 0000000..3e06f38
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.TextFormat;
+import java.io.PrintStream;
+import java.util.EnumMap;
+import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
+import org.apache.beam.fn.harness.control.BeamFnControlClient;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler;
+import org.apache.beam.fn.harness.control.RegisterHandler;
+import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
+import org.apache.beam.fn.harness.fn.ThrowingFunction;
+import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
+import org.apache.beam.fn.harness.stream.StreamObserverFactory;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point into the Beam SDK Fn Harness for Java.
+ *
+ * <p>This entry point expects the following environment variables:
+ * <ul>
+ * <li>LOGGING_API_SERVICE_DESCRIPTOR: A
+ * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text
+ * representing the endpoint that is to be connected to for the Beam Fn Logging service.</li>
+ * <li>CONTROL_API_SERVICE_DESCRIPTOR: A
+ * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text
+ * representing the endpoint that is to be connected to for the Beam Fn Control service.</li>
+ * <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions}
+ * for further details.</li>
+ * </ul>
+ */
+public class FnHarness {
+ private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
+ private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
+ private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
+ private static final Logger LOGGER = LoggerFactory.getLogger(FnHarness.class);
+
+ private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String env)
+ throws TextFormat.ParseException {
+ BeamFnApi.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+ BeamFnApi.ApiServiceDescriptor.newBuilder();
+ TextFormat.merge(System.getenv(env), apiServiceDescriptorBuilder);
+ return apiServiceDescriptorBuilder.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.format("SDK Fn Harness started%n");
+ System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
+ System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
+ System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
+
+ PipelineOptions options = new ObjectMapper().readValue(
+ System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
+
+ BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor =
+ getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
+
+ BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor =
+ getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);
+
+ main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
+ }
+
+ public static void main(PipelineOptions options,
+ BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor,
+ BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
+ IOChannelUtils.registerIOFactories(options);
+
+ ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
+ StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
+ PrintStream originalErrStream = System.err;
+
+ try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
+ options,
+ loggingApiServiceDescriptor,
+ channelFactory::forDescriptor,
+ streamObserverFactory::from)) {
+
+ LOGGER.info("Fn Harness started");
+ EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+ ThrowingFunction<BeamFnApi.InstructionRequest,
+ BeamFnApi.InstructionResponse.Builder>> handlers =
+ new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
+
+ RegisterHandler fnApiRegistry = new RegisterHandler();
+ BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
+ options, channelFactory::forDescriptor, streamObserverFactory::from);
+
+ ProcessBundleHandler processBundleHandler =
+ new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer);
+ handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
+ fnApiRegistry::register);
+ handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
+ processBundleHandler::processBundle);
+ BeamFnControlClient control = new BeamFnControlClient(controlApiServiceDescriptor,
+ channelFactory::forDescriptor,
+ streamObserverFactory::from,
+ handlers);
+
+ LOGGER.info("Entering instruction processing loop");
+ control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
+ } catch (Throwable t) {
+ t.printStackTrace(originalErrStream);
+ } finally {
+ originalErrStream.println("Shutting SDK harness down.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
new file mode 100644
index 0000000..d26f4a5
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel} implementation
+ * to use.
+ */
+public abstract class ManagedChannelFactory {
+ public static ManagedChannelFactory from(PipelineOptions options) {
+ List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
+ io.netty.channel.epoll.Epoll.ensureAvailability();
+ return new Epoll();
+ }
+ return new Default();
+ }
+
+ public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
+
+ /**
+ * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
+ * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
+ * {@link EpollSocketChannel}.
+ */
+ private static class Epoll extends ManagedChannelFactory {
+ @Override
+ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+ SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
+ return NettyChannelBuilder.forAddress(address)
+ .channelType(address instanceof DomainSocketAddress
+ ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
+ .eventLoopGroup(new EpollEventLoopGroup())
+ .usePlaintext(true)
+ .build();
+ }
+ }
+
+ /**
+ * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
+ * instances.
+ */
+ private static class Default extends ManagedChannelFactory {
+ @Override
+ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+ return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
+ .usePlaintext(true)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
new file mode 100644
index 0000000..a27d542
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.channel;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/** Creates a {@link SocketAddress} based upon a supplied string. */
+public class SocketAddressFactory {
+ private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
+
+ /**
+ * Parse a {@link SocketAddress} from the given string.
+ */
+ public static SocketAddress createFrom(String value) {
+ if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
+ // Unix Domain Socket address.
+ // Create the underlying file for the Unix Domain Socket.
+ String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
+ File file = new File(filePath);
+ if (!file.isAbsolute()) {
+ throw new IllegalArgumentException("File path must be absolute: " + filePath);
+ }
+ try {
+ if (file.createNewFile()) {
+ // If this application created the file, delete it when the application exits.
+ file.deleteOnExit();
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ // Create the SocketAddress referencing the file.
+ return new DomainSocketAddress(file);
+ } else {
+ // Standard TCP/IP address.
+ HostAndPort hostAndPort = HostAndPort.fromString(value);
+ checkArgument(hostAndPort.hasPort(),
+ "Address must be a unix:// path or be in the form host:port. Got: %s", value);
+ return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
new file mode 100644
index 0000000..6323166
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC channel management.
+ */
+package org.apache.beam.fn.harness.channel;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
new file mode 100644
index 0000000..7f44a01
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.EnumMap;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.ThrowingFunction;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client for the Beam Fn Control API. Uses an unbounded internal queue to pull down
+ * an unbounded number of requests.
+ *
+ * <p>Also can delegate to a set of handlers based upon the
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest.RequestCase request type}.
+ *
+ * <p>When the inbound instruction stream finishes successfully, the {@code onFinish} is
+ * completed successfully signaling to the caller that this client will not produce any more
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s. If the inbound instruction stream
+ * errors, the {@code onFinish} is completed exceptionally propagating the failure reason
+ * to the caller and signaling that this client will not produce any more
+ * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s.
+ */
+public class BeamFnControlClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnControlClient.class);
+ private static final BeamFnApi.InstructionRequest POISON_PILL =
+ BeamFnApi.InstructionRequest.newBuilder().setInstructionId(Long.MIN_VALUE).build();
+
+ private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
+ private final BlockingDeque<BeamFnApi.InstructionRequest> bufferedInstructions;
+ private final EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+ ThrowingFunction<BeamFnApi.InstructionRequest,
+ BeamFnApi.InstructionResponse.Builder>> handlers;
+ private final CompletableFuture<Void> onFinish;
+
+ public BeamFnControlClient(
+ BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+ Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+ BiFunction<Function<StreamObserver<BeamFnApi.InstructionRequest>,
+ StreamObserver<BeamFnApi.InstructionResponse>>,
+ StreamObserver<BeamFnApi.InstructionRequest>,
+ StreamObserver<BeamFnApi.InstructionResponse>> streamObserverFactory,
+ EnumMap<BeamFnApi.InstructionRequest.RequestCase,
+ ThrowingFunction<BeamFnApi.InstructionRequest,
+ BeamFnApi.InstructionResponse.Builder>> handlers) {
+ this.bufferedInstructions = new LinkedBlockingDeque<>();
+ this.outboundObserver = streamObserverFactory.apply(
+ BeamFnControlGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::control,
+ new InboundObserver());
+ this.handlers = handlers;
+ this.onFinish = new CompletableFuture<>();
+ }
+
+ /**
+ * A {@link StreamObserver} for the inbound stream that completes the future on stream
+ * termination.
+ */
+ private class InboundObserver implements StreamObserver<BeamFnApi.InstructionRequest> {
+ @Override
+ public void onNext(BeamFnApi.InstructionRequest value) {
+ LOGGER.info("InstructionRequest received {}", value);
+ Uninterruptibles.putUninterruptibly(bufferedInstructions, value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ placePoisonPillIntoQueue();
+ onFinish.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ placePoisonPillIntoQueue();
+ onFinish.complete(null);
+ }
+
+ /**
+ * This method emulates {@link Uninterruptibles#putUninterruptibly} but placing the
+ * element at the front of the queue.
+ *
+ * <p>We place the poison pill at the front of the queue because if the server shutdown,
+ * any remaining instructions can be discarded.
+ */
+ private void placePoisonPillIntoQueue() {
+ while (true) {
+ try {
+ bufferedInstructions.putFirst(POISON_PILL);
+ return;
+ } catch (InterruptedException e) {
+ // Ignored until we place the poison pill into the queue
+ }
+ }
+ }
+ }
+
+ /**
+ * Note that this method continuously submits work to the supplied executor until the
+ * Beam Fn Control server hangs up or fails exceptionally.
+ */
+ public void processInstructionRequests(Executor executor)
+ throws InterruptedException, ExecutionException {
+ BeamFnApi.InstructionRequest request;
+ while ((request = bufferedInstructions.take()) != POISON_PILL) {
+ BeamFnApi.InstructionRequest currentRequest = request;
+ executor.execute(
+ () -> sendInstructionResponse(delegateOnInstructionRequestType(currentRequest)));
+ }
+ onFinish.get();
+ }
+
+ public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
+ BeamFnApi.InstructionRequest value) {
+ try {
+ return handlers.getOrDefault(value.getRequestCase(), this::missingHandler)
+ .apply(value)
+ .setInstructionId(value.getInstructionId())
+ .build();
+ } catch (Exception e) {
+ return BeamFnApi.InstructionResponse.newBuilder()
+ .setInstructionId(value.getInstructionId())
+ .setError(getStackTraceAsString(e))
+ .build();
+ }
+ }
+
+ public void sendInstructionResponse(BeamFnApi.InstructionResponse value) {
+ outboundObserver.onNext(value);
+ }
+
+ private BeamFnApi.InstructionResponse.Builder missingHandler(
+ BeamFnApi.InstructionRequest request) {
+ return BeamFnApi.InstructionResponse.newBuilder().setError(
+ String.format("Unknown InstructionRequest type %s", request.getRequestCase()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
new file mode 100644
index 0000000..05c2aab
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fake.FakeAggregatorFactory;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.BeamFnDataReadRunner;
+import org.apache.beam.runners.core.BeamFnDataWriteRunner;
+import org.apache.beam.runners.core.BoundedSourceRunner;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing
+ * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
+ * wiring them together based upon the {@code input} and {@code output} map definitions.
+ *
+ * <p>Finally executes the DAG based graph by starting all runners in reverse topological order,
+ * and finishing all runners in forward topological order.
+ */
+public class ProcessBundleHandler {
+ // TODO: What should the initial set of URNs be?
+ private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
+ private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
+ private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
+ private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBundleHandler.class);
+
+ private final PipelineOptions options;
+ private final Function<Long, Message> fnApiRegistry;
+ private final BeamFnDataClient beamFnDataClient;
+
+ public ProcessBundleHandler(
+ PipelineOptions options,
+ Function<Long, Message> fnApiRegistry,
+ BeamFnDataClient beamFnDataClient) {
+ this.options = options;
+ this.fnApiRegistry = fnApiRegistry;
+ this.beamFnDataClient = beamFnDataClient;
+ }
+
+ protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
+ BeamFnApi.PrimitiveTransform primitiveTransform,
+ Supplier<Long> processBundleInstructionId,
+ Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
+ BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+ BeamFnApi.FunctionSpec functionSpec = primitiveTransform.getFunctionSpec();
+
+ // For every output PCollection, create a map from output name to Consumer
+ ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>>
+ outputMapBuilder = ImmutableMap.builder();
+ for (Map.Entry<String, BeamFnApi.PCollection> entry :
+ primitiveTransform.getOutputsMap().entrySet()) {
+ outputMapBuilder.put(
+ entry.getKey(),
+ consumers.apply(
+ BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(primitiveTransform.getId())
+ .setName(entry.getKey())
+ .build()));
+ }
+ ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap =
+ outputMapBuilder.build();
+
+ // Based upon the function spec, populate the start/finish/consumer information.
+ ThrowingConsumer<WindowedValue<InputT>> consumer;
+ switch (functionSpec.getUrn()) {
+ default:
+ BeamFnApi.Target target;
+ BeamFnApi.Coder coderSpec;
+ throw new IllegalArgumentException(
+ String.format("Unknown FunctionSpec %s", functionSpec));
+
+ case DATA_OUTPUT_URN:
+ target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(primitiveTransform.getId())
+ .setName(getOnlyElement(primitiveTransform.getOutputsMap().keySet()))
+ .build();
+ coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
+ getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
+ BeamFnDataWriteRunner<InputT> remoteGrpcWriteRunner =
+ new BeamFnDataWriteRunner<>(
+ functionSpec,
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient);
+ addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput);
+ consumer = remoteGrpcWriteRunner::consume;
+ addFinishFunction.accept(remoteGrpcWriteRunner::close);
+ break;
+
+ case DATA_INPUT_URN:
+ target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(primitiveTransform.getId())
+ .setName(getOnlyElement(primitiveTransform.getInputsMap().keySet()))
+ .build();
+ coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply(
+ getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference());
+ BeamFnDataReadRunner<OutputT> remoteGrpcReadRunner =
+ new BeamFnDataReadRunner<>(
+ functionSpec,
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient,
+ outputMap);
+ addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation);
+ consumer = null;
+ addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes);
+ break;
+
+ case JAVA_DO_FN_URN:
+ DoFnRunner<InputT, OutputT> doFnRunner = createDoFnRunner(functionSpec, outputMap);
+ addStartFunction.accept(doFnRunner::startBundle);
+ addFinishFunction.accept(doFnRunner::finishBundle);
+ consumer = doFnRunner::processElement;
+ break;
+
+ case JAVA_SOURCE_URN:
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ BoundedSourceRunner<BoundedSource<OutputT>, OutputT> sourceRunner =
+ createBoundedSourceRunner(functionSpec, outputMap);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ThrowingConsumer<WindowedValue<?>> sourceConsumer =
+ (ThrowingConsumer)
+ (ThrowingConsumer<WindowedValue<BoundedSource<OutputT>>>)
+ sourceRunner::runReadLoop;
+ // TODO: Remove and replace with source being sent across gRPC port
+ addStartFunction.accept(sourceRunner::start);
+ consumer = (ThrowingConsumer) sourceConsumer;
+ break;
+ }
+
+ if (consumer != null) {
+ for (Map.Entry<String, BeamFnApi.Target.List> entry :
+ primitiveTransform.getInputsMap().entrySet()) {
+ for (BeamFnApi.Target target : entry.getValue().getTargetList()) {
+ addConsumer.accept(target, consumer);
+ }
+ }
+ }
+ }
+
+ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request)
+ throws Exception {
+ BeamFnApi.InstructionResponse.Builder response =
+ BeamFnApi.InstructionResponse.newBuilder()
+ .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
+
+ long bundleId = request.getProcessBundle().getProcessBundleDescriptorReference();
+ BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
+ (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
+
+ Multimap<BeamFnApi.Target,
+ ThrowingConsumer<WindowedValue<Object>>> outputTargetToConsumer =
+ HashMultimap.create();
+ List<ThrowingRunnable> startFunctions = new ArrayList<>();
+ List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+ // We process the primitive transform list in reverse order
+ // because we assume that the runner provides it in topologically order.
+ // This means that all the start/finish functions will be in reverse topological order.
+ for (BeamFnApi.PrimitiveTransform primitiveTransform :
+ Lists.reverse(bundleDescriptor.getPrimitiveTransformList())) {
+ createConsumersForPrimitiveTransform(
+ primitiveTransform,
+ request::getInstructionId,
+ outputTargetToConsumer::get,
+ outputTargetToConsumer::put,
+ startFunctions::add,
+ finishFunctions::add);
+ }
+
+ // Already in reverse order so we don't need to do anything.
+ for (ThrowingRunnable startFunction : startFunctions) {
+ LOGGER.debug("Starting function {}", startFunction);
+ startFunction.run();
+ }
+
+ // Need to reverse this since we want to call finish in topological order.
+ for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
+ LOGGER.debug("Finishing function {}", finishFunction);
+ finishFunction.run();
+ }
+
+ return response;
+ }
+
+ /**
+ * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}.
+ */
+ private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(
+ BeamFnApi.FunctionSpec functionSpec,
+ Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+ ByteString serializedFn;
+ try {
+ serializedFn = functionSpec.getData().unpack(BytesValue.class).getValue();
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to unwrap DoFn %s", functionSpec), e);
+ }
+ DoFnInfo<?, ?> doFnInfo =
+ (DoFnInfo<?, ?>)
+ SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo");
+
+ checkArgument(
+ Objects.equals(
+ new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+ doFnInfo.getOutputMap().keySet()),
+ "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+ outputMap.keySet(),
+ doFnInfo.getOutputMap());
+
+ ImmutableMultimap.Builder<TupleTag<?>,
+ ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput =
+ ImmutableMultimap.builder();
+ for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+ tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey())));
+ }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap =
+ (Map) tagToOutput.build().asMap();
+
+ OutputManager outputManager =
+ new OutputManager() {
+ Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput =
+ tagBasedOutputMap;
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ try {
+ Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+ tupleTagToOutput.get(tag);
+ if (consumers == null) {
+ // TODO: Should we handle undeclared outputs, if so how?
+ throw new UnsupportedOperationException(String.format(
+ "Unable to output %s on unknown output %s", output, tag));
+ }
+ for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) {
+ consumer.accept(output);
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ };
+
+ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+ DoFnRunner<InputT, OutputT> runner =
+ DoFnRunners.simpleRunner(
+ PipelineOptionsFactory.create(), /* TODO */
+ (DoFn) doFnInfo.getDoFn(),
+ NullSideInputReader.empty(), /* TODO */
+ outputManager,
+ (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
+ new ArrayList<>(doFnInfo.getOutputMap().values()),
+ new FakeStepContext(),
+ new FakeAggregatorFactory(),
+ (WindowingStrategy) doFnInfo.getWindowingStrategy());
+ return runner;
+ }
+
+ private <InputT extends BoundedSource<OutputT>, OutputT>
+ BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(
+ BeamFnApi.FunctionSpec functionSpec,
+ Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ BoundedSourceRunner<InputT, OutputT> runner =
+ new BoundedSourceRunner(options, functionSpec, outputMap);
+ return runner;
+ }
+}