You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/13 15:57:12 UTC

[2/2] beam git commit: Actually test the fn_api_runner.

Actually test the fn_api_runner.

The test suite was not being run due to a typo.
Fix breakage due to changes in the code in the meantime.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e71eb66a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e71eb66a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e71eb66a

Branch: refs/heads/master
Commit: e71eb66ae319bdf0cdad1fe9b54662962c8e8f16
Parents: 7126fdc
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jun 9 16:44:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jun 13 08:56:36 2017 -0700

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py            | 16 ++++++----------
 .../runners/portability/fn_api_runner_test.py       |  4 ++--
 .../python/apache_beam/runners/worker/operations.py |  1 +
 .../python/apache_beam/runners/worker/sdk_worker.py |  2 +-
 4 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a83eae4..8c213ad 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -179,7 +179,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         # into the sdk worker, or an injection of the source object into the
         # sdk worker as data followed by an SDF that reads that source.
         if (isinstance(operation.source.source,
-                       worker_runner_base.InMemorySource)
+                       maptask_executor_runner.InMemorySource)
             and isinstance(operation.source.source.default_output_coder(),
                            WindowedValueCoder)):
           output_stream = create_OutputStream()
@@ -264,11 +264,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             element_coder.get_impl().encode_to_stream(
                 element, output_stream, True)
           elements_data = output_stream.get()
-          state_key = beam_fn_api_pb2.StateKey(function_spec_reference=view_id)
+          state_key = beam_fn_api_pb2.StateKey(key=view_id)
           state_handler.Clear(state_key)
-          state_handler.Append(
-              beam_fn_api_pb2.SimpleStateAppendRequest(
-                  state_key=state_key, data=[elements_data]))
+          state_handler.Append(state_key, elements_data)
 
       elif isinstance(operation, operation_specs.WorkerFlatten):
         fn = sdk_worker.pack_function_spec_data(
@@ -382,9 +380,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       return beam_fn_api_pb2.Elements.Data(
           data=''.join(self._all[self._to_key(state_key)]))
 
-    def Append(self, append_request):
-      self._all[self._to_key(append_request.state_key)].extend(
-          append_request.data)
+    def Append(self, state_key, data):
+      self._all[self._to_key(state_key)].extend(data)
 
     def Clear(self, state_key):
       try:
@@ -394,8 +391,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
     @staticmethod
     def _to_key(state_key):
-      return (state_key.function_spec_reference, state_key.window,
-              state_key.key)
+      return state_key.window, state_key.key
 
   class DirectController(object):
     """An in-memory controller for fn API control, state and data planes."""

http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 633602f..66d985a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -20,10 +20,10 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.runners.portability import fn_api_runner
-from apache_beam.runners.portability import maptask_executor_runner
+from apache_beam.runners.portability import maptask_executor_runner_test
 
 
-class FnApiRunnerTest(maptask_executor_runner.MapTaskExecutorRunner):
+class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest):
 
   def create_pipeline(self):
     return beam.Pipeline(runner=fn_api_runner.FnApiRunner())

http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index a44561d..c4f945b 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -129,6 +129,7 @@ class Operation(object):
         self.operation_name + '-finish')
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
+    self.scoped_metrics_container = None
 
   def start(self):
     """Start operation."""

http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 33f2b61..dc4f5c2 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -359,7 +359,7 @@ class SdkWorker(object):
               source=SideInputSource(
                   self.state_handler,
                   beam_fn_api_pb2.StateKey(
-                      function_spec_reference=si.view_fn.id),
+                      key=si.view_fn.id.encode('utf-8')),
                   coder=unpack_and_deserialize_py_fn(si.view_fn)))
         output_tags = list(transform.outputs.keys())
         spec = operation_specs.WorkerDoFn(