You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/01 17:04:35 UTC
[1/2] beam git commit: Inline rather than reference FunctionSpecs.
Repository: beam
Updated Branches:
refs/heads/master 79b1395c2 -> d84b06791
Inline rather than reference FunctionSpecs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d390406e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d390406e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d390406e
Branch: refs/heads/master
Commit: d390406e27112faed31233d7daef1f650a31cd0f
Parents: 79b1395
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Feb 28 15:51:24 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Mar 1 09:04:30 2017 -0800
----------------------------------------------------------------------
.../src/main/proto/beam_runner_api.proto | 39 +++++++++-----------
.../beam/sdk/util/WindowingStrategies.java | 18 ++-------
2 files changed, 20 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 58532b2..44ead56 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -47,10 +47,6 @@ message Components {
// (Required) A map from pipeline-scoped id to Environment.
map<string, Environment> environments = 5;
-
- // (Required) A map from pipeline-scoped id to FunctionSpec,
- // a record for a particular user-defined function.
- map<string, FunctionSpec> function_specs = 6;
}
// A disjoint union of all the things that may contain references
@@ -207,8 +203,8 @@ message PCollection {
// The payload for the primitive ParDo transform.
message ParDoPayload {
- // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn.
- string fn_id = 1;
+ // (Required) The FunctionSpec of the DoFn.
+ FunctionSpec do_fn = 1;
// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
@@ -266,9 +262,8 @@ enum IsBounded {
// The payload for the primitive Read transform.
message ReadPayload {
- // (Required) The pipeline-scoped id of the FunctionSpec of the source for
- // this Read.
- string source_id = 1;
+ // (Required) The FunctionSpec of the source for this Read.
+ FunctionSpec source = 1;
// (Required) Whether the source is bounded or unbounded
IsBounded is_bounded = 2;
@@ -279,15 +274,15 @@ message ReadPayload {
// The payload for the WindowInto transform.
message WindowIntoPayload {
- // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn.
- string fn_id = 1;
+ // (Required) The FunctionSpec of the WindowFn.
+ FunctionSpec window_fn = 1;
}
// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {
- // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn.
- string fn_id = 1;
+ // (Required) The FunctionSpec of the CombineFn.
+ FunctionSpec combine_fn = 1;
// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
@@ -325,10 +320,10 @@ message Coder {
// TODO: consider inlining field on PCollection
message WindowingStrategy {
- // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
- // assigns windows, merges windows, and shifts timestamps before they are
+ // (Required) The FunctionSpec of the UDF that assigns windows,
+ // merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
- string fn_id = 1;
+ FunctionSpec window_fn = 1;
// (Required) Whether or not the window fn is merging.
//
@@ -584,20 +579,20 @@ message SideInput {
// URN)
UrnWithParameter access_pattern = 1;
- // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
- // adapts a particular access_pattern to a user-facing view type.
+ // (Required) The FunctionSpec of the UDF that adapts a particular
+ // access_pattern to a user-facing view type.
//
// For example, View.asSingleton() may include a `view_fn` that adapts a
// specially-designed multimap to a single value per window.
- string view_fn_id = 2;
+ FunctionSpec view_fn = 2;
- // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
- // maps a main input window to a side input window.
+ // (Required) The FunctionSpec of the UDF that maps a main input window
+ // to a side input window.
//
// For example, when the main input is in fixed windows of one hour, this
// can specify that the side input should be accessed according to the day
// in which that hour falls.
- string window_mapping_fn_id = 3;
+ FunctionSpec window_mapping_fn = 3;
}
// An environment for executing UDFs. Generally an SDK container URL, but
http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
index 3047da1..7bc581c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
@@ -195,10 +195,6 @@ public class WindowingStrategies implements Serializable {
public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
throws IOException {
- // TODO: have an inverted components to find the id for a thing already
- // in the components
- String windowFnId = UUID.randomUUID().toString();
-
RunnerApi.MessageWithComponents windowFnWithComponents =
toProto(windowingStrategy.getWindowFn());
@@ -209,16 +205,11 @@ public class WindowingStrategies implements Serializable {
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
.setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
- .setFnId(windowFnId);
+ .setWindowFn(windowFnWithComponents.getFunctionSpec());
return RunnerApi.MessageWithComponents.newBuilder()
.setWindowingStrategy(windowingStrategyProto)
- .setComponents(
- windowFnWithComponents
- .getComponents()
- .toBuilder()
- .putFunctionSpecs(windowFnId, windowFnWithComponents.getFunctionSpec()))
- .build();
+ .setComponents(windowFnWithComponents.getComponents()).build();
}
/**
@@ -246,10 +237,7 @@ public class WindowingStrategies implements Serializable {
RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
throws InvalidProtocolBufferException {
- FunctionSpec windowFnSpec =
- components
- .getFunctionSpecsMap()
- .get(proto.getFnId());
+ FunctionSpec windowFnSpec = proto.getWindowFn();
checkArgument(
windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
[2/2] beam git commit: Closes #2131
Posted by ro...@apache.org.
Closes #2131
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d84b0679
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d84b0679
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d84b0679
Branch: refs/heads/master
Commit: d84b0679178ab803081ee3e05fe222d234e94c30
Parents: 79b1395 d390406
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Mar 1 09:04:31 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Mar 1 09:04:31 2017 -0800
----------------------------------------------------------------------
.../src/main/proto/beam_runner_api.proto | 39 +++++++++-----------
.../beam/sdk/util/WindowingStrategies.java | 18 ++-------
2 files changed, 20 insertions(+), 37 deletions(-)
----------------------------------------------------------------------