You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/27 17:18:19 UTC
[1/2] beam git commit: Enable grpc controller in fn_api_runner
Repository: beam
Updated Branches:
refs/heads/master e93c06485 -> 39074899a
Enable grpc controller in fn_api_runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8dd0077d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8dd0077d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8dd0077d
Branch: refs/heads/master
Commit: 8dd0077d2a58e278b11c7e7eb4b5f182e1400992
Parents: e93c064
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Jun 26 18:47:39 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 27 10:17:49 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 12 +++++++---
.../runners/portability/fn_api_runner_test.py | 23 +++++++++++++++++++-
.../apache_beam/runners/worker/sdk_worker.py | 2 +-
3 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a8e2eb4..c5438ad 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -174,12 +174,17 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
return {tag: pcollection_id(op_ix, out_ix)
for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
+ def only_element(iterable):
+ element, = iterable
+ return element
+
for op_ix, (stage_name, operation) in enumerate(map_task):
transform_id = uniquify(stage_name)
if isinstance(operation, operation_specs.WorkerInMemoryWrite):
# Write this data back to the runner.
- runner_sinks[(transform_id, 'out')] = operation
+ target_name = only_element(get_inputs(operation).keys())
+ runner_sinks[(transform_id, target_name)] = operation
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=sdk_worker.DATA_OUTPUT_URN,
parameter=proto_utils.pack_Any(data_operation_spec))
@@ -190,7 +195,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
maptask_executor_runner.InMemorySource)
and isinstance(operation.source.source.default_output_coder(),
WindowedValueCoder)):
- input_data[(transform_id, 'input')] = self._reencode_elements(
+ target_name = only_element(get_outputs(op_ix).keys())
+ input_data[(transform_id, target_name)] = self._reencode_elements(
operation.source.source.read(None),
operation.source.source.default_output_coder())
transform_spec = beam_runner_api_pb2.FunctionSpec(
@@ -309,7 +315,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
sink_op.output_buffer.append(e)
return
- def execute_map_tasks(self, ordered_map_tasks, direct=True):
+ def execute_map_tasks(self, ordered_map_tasks, direct=False):
if direct:
controller = FnApiRunner.DirectController()
else:
http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 9159035..163e980 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -21,6 +21,8 @@ import unittest
import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability import maptask_executor_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class FnApiRunnerTest(
@@ -31,9 +33,28 @@ class FnApiRunnerTest(
runner=fn_api_runner.FnApiRunner())
def test_combine_per_key(self):
- # TODO(robertwb): Implement PGBKCV operation.
+ # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
pass
+ def test_combine_per_key(self):
+ # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
+ pass
+
+ def test_pardo_side_inputs(self):
+ # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
+ pass
+
+ def test_pardo_unfusable_side_inputs(self):
+ # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
+ pass
+
+ def test_assert_that(self):
+ # TODO: figure out a way for fn_api_runner to parse and raise the
+ # underlying exception.
+ with self.assertRaisesRegexp(RuntimeError, 'BeamAssertException'):
+ with self.create_pipeline() as p:
+ assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+
# Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 6a366eb..e1ddfb7 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -415,7 +415,7 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
def create(factory, transform_id, transform_proto, grpc_port, consumers):
target = beam_fn_api_pb2.Target(
primitive_transform_reference=transform_id,
- name='out')
+ name=only_element(transform_proto.inputs.keys()))
return DataOutputOperation(
transform_proto.unique_name,
transform_proto.unique_name,
[2/2] beam git commit: This closes #3445
Posted by al...@apache.org.
This closes #3445
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39074899
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39074899
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39074899
Branch: refs/heads/master
Commit: 39074899a969a4feff0081da04cbd600f6480e93
Parents: e93c064 8dd0077
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jun 27 10:17:53 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 27 10:17:53 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 12 +++++++---
.../runners/portability/fn_api_runner_test.py | 23 +++++++++++++++++++-
.../apache_beam/runners/worker/sdk_worker.py | 2 +-
3 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------