You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/13 15:57:12 UTC
[2/2] beam git commit: Actually test the fn_api_runner.
Actually test the fn_api_runner.
The test suite was not being run due to a typo.
Fix breakage due to changes in the code in the meantime.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e71eb66a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e71eb66a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e71eb66a
Branch: refs/heads/master
Commit: e71eb66ae319bdf0cdad1fe9b54662962c8e8f16
Parents: 7126fdc
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jun 9 16:44:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jun 13 08:56:36 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 16 ++++++----------
.../runners/portability/fn_api_runner_test.py | 4 ++--
.../python/apache_beam/runners/worker/operations.py | 1 +
.../python/apache_beam/runners/worker/sdk_worker.py | 2 +-
4 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a83eae4..8c213ad 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -179,7 +179,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
# into the sdk worker, or an injection of the source object into the
# sdk worker as data followed by an SDF that reads that source.
if (isinstance(operation.source.source,
- worker_runner_base.InMemorySource)
+ maptask_executor_runner.InMemorySource)
and isinstance(operation.source.source.default_output_coder(),
WindowedValueCoder)):
output_stream = create_OutputStream()
@@ -264,11 +264,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
element_coder.get_impl().encode_to_stream(
element, output_stream, True)
elements_data = output_stream.get()
- state_key = beam_fn_api_pb2.StateKey(function_spec_reference=view_id)
+ state_key = beam_fn_api_pb2.StateKey(key=view_id)
state_handler.Clear(state_key)
- state_handler.Append(
- beam_fn_api_pb2.SimpleStateAppendRequest(
- state_key=state_key, data=[elements_data]))
+ state_handler.Append(state_key, elements_data)
elif isinstance(operation, operation_specs.WorkerFlatten):
fn = sdk_worker.pack_function_spec_data(
@@ -382,9 +380,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
return beam_fn_api_pb2.Elements.Data(
data=''.join(self._all[self._to_key(state_key)]))
- def Append(self, append_request):
- self._all[self._to_key(append_request.state_key)].extend(
- append_request.data)
+ def Append(self, state_key, data):
+ self._all[self._to_key(state_key)].extend(data)
def Clear(self, state_key):
try:
@@ -394,8 +391,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
@staticmethod
def _to_key(state_key):
- return (state_key.function_spec_reference, state_key.window,
- state_key.key)
+ return state_key.window, state_key.key
class DirectController(object):
"""An in-memory controller for fn API control, state and data planes."""
http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 633602f..66d985a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -20,10 +20,10 @@ import unittest
import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
-from apache_beam.runners.portability import maptask_executor_runner
+from apache_beam.runners.portability import maptask_executor_runner_test
-class FnApiRunnerTest(maptask_executor_runner.MapTaskExecutorRunner):
+class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest):
def create_pipeline(self):
return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index a44561d..c4f945b 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -129,6 +129,7 @@ class Operation(object):
self.operation_name + '-finish')
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
+ self.scoped_metrics_container = None
def start(self):
"""Start operation."""
http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 33f2b61..dc4f5c2 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -359,7 +359,7 @@ class SdkWorker(object):
source=SideInputSource(
self.state_handler,
beam_fn_api_pb2.StateKey(
- function_spec_reference=si.view_fn.id),
+ key=si.view_fn.id.encode('utf-8')),
coder=unpack_and_deserialize_py_fn(si.view_fn)))
output_tags = list(transform.outputs.keys())
spec = operation_specs.WorkerDoFn(