You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:04:52 UTC

[26/50] beam git commit: Java Dataflow runner harness compatibility.

Java Dataflow runner harness compatibility.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cab1533
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cab1533
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cab1533

Branch: refs/heads/gearpump-runner
Commit: 8cab15338f811f880c6cfb820051cf355f92986b
Parents: 3785b5b
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jun 21 18:09:48 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 12:34:27 2017 -0700

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        |  6 ++++-
 .../apache_beam/runners/worker/sdk_worker.py    | 26 +++++++++++++++-----
 2 files changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index dabb7d6..a27e293 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -17,6 +17,7 @@
 
 """A PipelineRunner using the SDK harness.
 """
+import base64
 import collections
 import json
 import logging
@@ -204,11 +205,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         else:
           # Otherwise serialize the source and execute it there.
           # TODO: Use SDFs with an initial impulse.
+          # The Dataflow runner harness strips the base64 encoding. do the same
+          # here until we get the same thing back that we sent in.
           transform_spec = beam_runner_api_pb2.FunctionSpec(
               urn=sdk_worker.PYTHON_SOURCE_URN,
               parameter=proto_utils.pack_Any(
                   wrappers_pb2.BytesValue(
-                      value=pickler.dumps(operation.source.source))))
+                      value=base64.b64decode(
+                          pickler.dumps(operation.source.source)))))
 
       elif isinstance(operation, operation_specs.WorkerDoFn):
         # Record the contents of each side input for access via the state api.

http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index fd7ecc4..a2c9f42 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
+import base64
 import collections
 import json
 import logging
@@ -195,7 +196,7 @@ def pack_function_spec_data(value, urn, id=None):
 # pylint: enable=redefined-builtin
 
 
-# TODO(vikasrk): move this method to ``coders.py`` in the SDK.
+# TODO(vikasrk): Consistently use same format everywhere.
 def load_compressed(compressed_data):
   """Returns a decompressed and deserialized python object."""
   # Note: SDK uses ``pickler.dumps`` to serialize certain python objects
@@ -259,6 +260,10 @@ class SdkHarness(object):
         try:
           response = self.worker.do_instruction(work_request)
         except Exception:  # pylint: disable=broad-except
+          logging.error(
+              'Error processing instruction %s',
+              work_request.instruction_id,
+              exc_info=True)
           response = beam_fn_api_pb2.InstructionResponse(
               instruction_id=work_request.instruction_id,
               error=traceback.format_exc())
@@ -319,10 +324,10 @@ class SdkWorker(object):
     return response
 
   def create_execution_tree(self, descriptor):
-    if descriptor.primitive_transform:
-      return self.create_execution_tree_from_fn_api(descriptor)
-    else:
+    if descriptor.transforms:
       return self.create_execution_tree_from_runner_api(descriptor)
+    else:
+      return self.create_execution_tree_from_fn_api(descriptor)
 
   def create_execution_tree_from_runner_api(self, descriptor):
     # TODO(robertwb): Figure out the correct prefix to use for output counters
@@ -551,7 +556,15 @@ class BeamTransformFactory(object):
     return creator(self, transform_id, transform_proto, parameter, consumers)
 
   def get_coder(self, coder_id):
-    return self.context.coders.get_by_id(coder_id)
+    coder_proto = self.descriptor.codersyyy[coder_id]
+    if coder_proto.spec.spec.urn:
+      return self.context.coders.get_by_id(coder_id)
+    else:
+      # No URN, assume cloud object encoding json bytes.
+      return operation_specs.get_coder_from_spec(
+          json.loads(
+              proto_utils.unpack_Any(coder_proto.spec.spec.parameter,
+                                     wrappers_pb2.BytesValue).value))
 
   def get_output_coders(self, transform_proto):
     return {
@@ -618,7 +631,8 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
 
 @BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
 def create(factory, transform_id, transform_proto, parameter, consumers):
-  source = pickler.loads(parameter.value)
+  # The Dataflow runner harness strips the base64 encoding.
+  source = pickler.loads(base64.b64encode(parameter.value))
   spec = operation_specs.WorkerRead(
       iobase.SourceBundle(1.0, source, None, None),
       [WindowedValueCoder(source.default_output_coder())])