You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:44 UTC
[32/50] [abbrv] beam git commit: [BEAM-1348] Remove deprecated
concepts in Fn API (now replaced with Runner API concepts).
[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API concepts).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f6117ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f6117ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f6117ff
Branch: refs/heads/DSL_SQL
Commit: 1f6117ffb23fc179a699cf11ebc2620af6cf2d4c
Parents: e014db6
Author: Luke Cwik <lc...@google.com>
Authored: Fri Jun 30 10:21:55 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../fn-api/src/main/proto/beam_fn_api.proto | 151 +------------------
.../harness/control/ProcessBundleHandler.java | 4 +-
.../fn/harness/control/RegisterHandler.java | 2 +-
.../fn/harness/control/RegisterHandlerTest.java | 8 +-
.../apache_beam/runners/pipeline_context.py | 2 +-
.../runners/portability/fn_api_runner.py | 2 +-
.../apache_beam/runners/worker/sdk_worker.py | 4 +-
.../runners/worker/sdk_worker_test.py | 16 +-
8 files changed, 25 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 8162bc5..9da5afe 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,7 +38,6 @@ option java_package = "org.apache.beam.fn.v1";
option java_outer_classname = "BeamFnApi";
import "beam_runner_api.proto";
-import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
/*
@@ -67,129 +66,6 @@ message Target {
string name = 2;
}
-// (Deprecated) Information defining a PCollection
-//
-// Migrate to Runner API.
-message PCollection {
- // (Required) A reference to a coder.
- string coder_reference = 1 [deprecated = true];
-
- // TODO: Windowing strategy, ...
-}
-
-// (Deprecated) A primitive transform within Apache Beam.
-//
-// Migrate to Runner API.
-message PrimitiveTransform {
- // (Required) A pipeline level unique id which can be used as a reference to
- // refer to this.
- string id = 1 [deprecated = true];
-
- // (Required) A function spec that is used by this primitive
- // transform to process data.
- FunctionSpec function_spec = 2 [deprecated = true];
-
- // A map of distinct input names to target definitions.
- // For example, in CoGbk this represents the tag name associated with each
- // distinct input name and a list of primitive transforms that are associated
- // with the specified input.
- map<string, Target.List> inputs = 3 [deprecated = true];
-
- // A map from local output name to PCollection definitions. For example, in
- // DoFn this represents the tag name associated with each distinct output.
- map<string, PCollection> outputs = 4 [deprecated = true];
-
- // TODO: Should we model side inputs as a special type of input for a
- // primitive transform or should it be modeled as the relationship that
- // the predecessor input will be a view primitive transform.
- // A map of from side input names to side inputs.
- map<string, SideInput> side_inputs = 5 [deprecated = true];
-
- // The user name of this step.
- // TODO: This should really be in display data and not at this level
- string step_name = 6 [deprecated = true];
-}
-
-/*
- * User Definable Functions
- *
- * This is still unstable mainly due to how we model the side input.
- */
-
-// (Deprecated) Defines the common elements of user-definable functions,
-// to allow the SDK to express the information the runner needs to execute work.
-//
-// Migrate to Runner API.
-message FunctionSpec {
- // (Required) A pipeline level unique id which can be used as a reference to
- // refer to this.
- string id = 1 [deprecated = true];
-
- // (Required) A globally unique name representing this user definable
- // function.
- //
- // User definable functions use the urn encodings registered such that another
- // may implement the user definable function within another language.
- //
- // For example:
- // urn:org.apache.beam:coder:kv:1.0
- string urn = 2 [deprecated = true];
-
- // (Required) Reference to specification of execution environment required to
- // invoke this function.
- string environment_reference = 3 [deprecated = true];
-
- // Data used to parameterize this function. Depending on the urn, this may be
- // optional or required.
- google.protobuf.Any data = 4 [deprecated = true];
-}
-
-// (Deprecated) Migrate to Runner API.
-message SideInput {
- // TODO: Coder?
-
- // For RunnerAPI.
- Target input = 1 [deprecated = true];
-
- // For FnAPI.
- FunctionSpec view_fn = 2 [deprecated = true];
-}
-
-// (Deprecated) Defines how to encode values into byte streams and decode
-// values from byte streams. A coder can be parameterized by additional
-// properties which may or may not be language agnostic.
-//
-// Coders using the urn:org.apache.beam:coder namespace must have their
-// encodings registered such that another may implement the encoding within
-// another language.
-//
-// For example:
-// urn:org.apache.beam:coder:kv:1.0
-// urn:org.apache.beam:coder:iterable:1.0
-//
-// Migrate to Runner API.
-message Coder {
- // TODO: This looks weird when compared to the other function specs
- // which use URN to differentiate themselves. Should "Coder" be embedded
- // inside the FunctionSpec data block.
-
- // The data associated with this coder used to reconstruct it.
- FunctionSpec function_spec = 1 [deprecated = true];
-
- // A list of component coder references.
- //
- // For a key-value coder, there must be exactly two component coder references
- // where the first reference represents the key coder and the second reference
- // is the value coder.
- //
- // For an iterable coder, there must be exactly one component coder reference
- // representing the value coder.
- //
- // TODO: Perhaps this is redundant with the data of the FunctionSpec
- // for known coders?
- repeated string component_coder_reference = 2 [deprecated = true];
-}
-
// A descriptor for connecting to a remote port using the Beam Fn Data API.
// Allows for communication between two environments (for example between the
// runner and the SDK).
@@ -278,33 +154,20 @@ message ProcessBundleDescriptor {
// refer to this.
string id = 1;
- // (Deprecated) A list of primitive transforms that should
- // be used to construct the bundle processing graph.
- //
- // Migrate to Runner API definitions found within transforms field.
- repeated PrimitiveTransform primitive_transform = 2 [deprecated = true];
-
- // (Deprecated) The set of all coders referenced in this bundle.
- //
- // Migrate to Runner API defintions found within codersyyy field.
- repeated Coder coders = 4 [deprecated = true];
-
// (Required) A map from pipeline-scoped id to PTransform.
- map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 5;
+ map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
// (Required) A map from pipeline-scoped id to PCollection.
- map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 6;
+ map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
- map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 7;
+ map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4;
// (Required) A map from pipeline-scoped id to Coder.
- // TODO: Rename to "coders" once deprecated coders field is removed. Unique
- // name is choosen to make it an easy search/replace
- map<string, org.apache.beam.runner_api.v1.Coder> codersyyy = 8;
+ map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
// (Required) A map from pipeline-scoped id to Environment.
- map<string, org.apache.beam.runner_api.v1.Environment> environments = 9;
+ map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
}
// A request to process a given bundle.
@@ -385,14 +248,14 @@ message PrimitiveTransformSplit {
//
// For example, a remote GRPC source will have a specific urn and data
// block containing an ElementCountRestriction.
- FunctionSpec completed_restriction = 2;
+ org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
// (Required) A function specification describing the restriction
// representing the remainder of work for the primitive transform.
//
// FOr example, a remote GRPC source will have a specific urn and data
// block contain an ElemntCountSkipRestriction.
- FunctionSpec remaining_restriction = 3;
+ org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
}
message ProcessBundleSplitResponse {
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 4c4f73d..2a9cef8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
/**
* Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing
- * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
+ * the set of required runners for each {@link RunnerApi.FunctionSpec},
* wiring them together based upon the {@code input} and {@code output} map definitions.
*
* <p>Finally executes the DAG based graph by starting all runners in reverse topological order,
@@ -166,7 +166,7 @@ public class ProcessBundleHandler {
pTransform,
processBundleInstructionId,
processBundleDescriptor.getPcollectionsMap(),
- processBundleDescriptor.getCodersyyyMap(),
+ processBundleDescriptor.getCodersMap(),
pCollectionIdsToConsumers,
addStartFunction,
addFinishFunction);
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index 276a120..0e738ac 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -79,7 +79,7 @@ public class RegisterHandler {
processBundleDescriptor.getClass());
computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
for (Map.Entry<String, RunnerApi.Coder> entry
- : processBundleDescriptor.getCodersyyyMap().entrySet()) {
+ : processBundleDescriptor.getCodersMap().entrySet()) {
LOG.debug("Registering {} with type {}",
entry.getKey(),
entry.getValue().getClass());
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index b1f4410..2b275af 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -44,14 +44,14 @@ public class RegisterHandlerTest {
.setRegister(BeamFnApi.RegisterRequest.newBuilder()
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder()
.setId("1L")
- .putCodersyyy("10L", RunnerApi.Coder.newBuilder()
+ .putCoders("10L", RunnerApi.Coder.newBuilder()
.setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:10L").build())
.build())
.build())
.build())
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L")
- .putCodersyyy("20L", RunnerApi.Coder.newBuilder()
+ .putCoders("20L", RunnerApi.Coder.newBuilder()
.setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:20L").build())
.build())
@@ -82,10 +82,10 @@ public class RegisterHandlerTest {
assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1),
handler.getById("2L"));
assertEquals(
- REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersyyyOrThrow("10L"),
+ REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersOrThrow("10L"),
handler.getById("10L"));
assertEquals(
- REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersyyyOrThrow("20L"),
+ REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersOrThrow("20L"),
handler.getById("20L"));
assertEquals(REGISTER_RESPONSE, responseFuture.get());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index c2ae3f3..a40069b 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -84,7 +84,7 @@ class PipelineContext(object):
def __init__(self, proto=None):
if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
proto = beam_runner_api_pb2.Components(
- coders=dict(proto.codersyyy.items()),
+ coders=dict(proto.coders.items()),
windowing_strategies=dict(proto.windowing_strategies.items()),
environments=dict(proto.environments.items()))
for name, cls in self._COMPONENT_TYPES.items():
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index c5438ad..f522864 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -261,7 +261,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
id=self._next_uid(),
transforms=transform_protos,
pcollections=pcollection_protos,
- codersyyy=dict(context_proto.coders.items()),
+ coders=dict(context_proto.coders.items()),
windowing_strategies=dict(context_proto.windowing_strategies.items()),
environments=dict(context_proto.environments.items()))
return input_data, side_input_data, runner_sinks, process_bundle_descriptor
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e1ddfb7..ae86830 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -249,8 +249,6 @@ class SdkWorker(object):
def register(self, request, unused_instruction_id=None):
for process_bundle_descriptor in request.process_bundle_descriptor:
self.fns[process_bundle_descriptor.id] = process_bundle_descriptor
- for p_transform in list(process_bundle_descriptor.primitive_transform):
- self.fns[p_transform.function_spec.id] = p_transform.function_spec
return beam_fn_api_pb2.RegisterResponse()
def create_execution_tree(self, descriptor):
@@ -355,7 +353,7 @@ class BeamTransformFactory(object):
return creator(self, transform_id, transform_proto, parameter, consumers)
def get_coder(self, coder_id):
- coder_proto = self.descriptor.codersyyy[coder_id]
+ coder_proto = self.descriptor.coders[coder_id]
if coder_proto.spec.spec.urn:
return self.context.coders.get_by_id(coder_id)
else:
http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 553d5b8..dc72a5f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -28,6 +28,7 @@ from concurrent import futures
import grpc
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.worker import sdk_worker
@@ -61,13 +62,12 @@ class BeamFnControlServicer(beam_fn_api_pb2.BeamFnControlServicer):
class SdkWorkerTest(unittest.TestCase):
def test_fn_registration(self):
- fns = [beam_fn_api_pb2.FunctionSpec(id=str(ix)) for ix in range(4)]
-
- process_bundle_descriptors = [beam_fn_api_pb2.ProcessBundleDescriptor(
- id=str(100+ix),
- primitive_transform=[
- beam_fn_api_pb2.PrimitiveTransform(function_spec=fn)])
- for ix, fn in enumerate(fns)]
+ process_bundle_descriptors = [
+ beam_fn_api_pb2.ProcessBundleDescriptor(
+ id=str(100+ix),
+ transforms={
+ str(ix): beam_runner_api_pb2.PTransform(unique_name=str(ix))})
+ for ix in range(4)]
test_controller = BeamFnControlServicer([beam_fn_api_pb2.InstructionRequest(
register=beam_fn_api_pb2.RegisterRequest(
@@ -83,7 +83,7 @@ class SdkWorkerTest(unittest.TestCase):
harness.run()
self.assertEqual(
harness.worker.fns,
- {item.id: item for item in fns + process_bundle_descriptors})
+ {item.id: item for item in process_bundle_descriptors})
if __name__ == "__main__":