You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/22 19:34:38 UTC
[1/2] beam git commit: Closes #3418
Repository: beam
Updated Branches:
refs/heads/master 3785b5baf -> 2a5520018
Closes #3418
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a552001
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a552001
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a552001
Branch: refs/heads/master
Commit: 2a5520018fb25f96b57e52231a8cc0b191abfce0
Parents: 3785b5b 8cab153
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:34:27 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 12:34:27 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 6 ++++-
.../apache_beam/runners/worker/sdk_worker.py | 26 +++++++++++++++-----
2 files changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Java Dataflow runner harness compatibility.
Posted by ro...@apache.org.
Java Dataflow runner harness compatibility.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cab1533
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cab1533
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cab1533
Branch: refs/heads/master
Commit: 8cab15338f811f880c6cfb820051cf355f92986b
Parents: 3785b5b
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jun 21 18:09:48 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 12:34:27 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 6 ++++-
.../apache_beam/runners/worker/sdk_worker.py | 26 +++++++++++++++-----
2 files changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index dabb7d6..a27e293 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -17,6 +17,7 @@
"""A PipelineRunner using the SDK harness.
"""
+import base64
import collections
import json
import logging
@@ -204,11 +205,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
else:
# Otherwise serialize the source and execute it there.
# TODO: Use SDFs with an initial impulse.
+ # The Dataflow runner harness strips the base64 encoding. do the same
+ # here until we get the same thing back that we sent in.
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=sdk_worker.PYTHON_SOURCE_URN,
parameter=proto_utils.pack_Any(
wrappers_pb2.BytesValue(
- value=pickler.dumps(operation.source.source))))
+ value=base64.b64decode(
+ pickler.dumps(operation.source.source)))))
elif isinstance(operation, operation_specs.WorkerDoFn):
# Record the contents of each side input for access via the state api.
http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index fd7ecc4..a2c9f42 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
+import base64
import collections
import json
import logging
@@ -195,7 +196,7 @@ def pack_function_spec_data(value, urn, id=None):
# pylint: enable=redefined-builtin
-# TODO(vikasrk): move this method to ``coders.py`` in the SDK.
+# TODO(vikasrk): Consistently use same format everywhere.
def load_compressed(compressed_data):
"""Returns a decompressed and deserialized python object."""
# Note: SDK uses ``pickler.dumps`` to serialize certain python objects
@@ -259,6 +260,10 @@ class SdkHarness(object):
try:
response = self.worker.do_instruction(work_request)
except Exception: # pylint: disable=broad-except
+ logging.error(
+ 'Error processing instruction %s',
+ work_request.instruction_id,
+ exc_info=True)
response = beam_fn_api_pb2.InstructionResponse(
instruction_id=work_request.instruction_id,
error=traceback.format_exc())
@@ -319,10 +324,10 @@ class SdkWorker(object):
return response
def create_execution_tree(self, descriptor):
- if descriptor.primitive_transform:
- return self.create_execution_tree_from_fn_api(descriptor)
- else:
+ if descriptor.transforms:
return self.create_execution_tree_from_runner_api(descriptor)
+ else:
+ return self.create_execution_tree_from_fn_api(descriptor)
def create_execution_tree_from_runner_api(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
@@ -551,7 +556,15 @@ class BeamTransformFactory(object):
return creator(self, transform_id, transform_proto, parameter, consumers)
def get_coder(self, coder_id):
- return self.context.coders.get_by_id(coder_id)
+ coder_proto = self.descriptor.codersyyy[coder_id]
+ if coder_proto.spec.spec.urn:
+ return self.context.coders.get_by_id(coder_id)
+ else:
+ # No URN, assume cloud object encoding json bytes.
+ return operation_specs.get_coder_from_spec(
+ json.loads(
+ proto_utils.unpack_Any(coder_proto.spec.spec.parameter,
+ wrappers_pb2.BytesValue).value))
def get_output_coders(self, transform_proto):
return {
@@ -618,7 +631,8 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
def create(factory, transform_id, transform_proto, parameter, consumers):
- source = pickler.loads(parameter.value)
+ # The Dataflow runner harness strips the base64 encoding.
+ source = pickler.loads(base64.b64encode(parameter.value))
spec = operation_specs.WorkerRead(
iobase.SourceBundle(1.0, source, None, None),
[WindowedValueCoder(source.default_output_coder())])