You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:55 UTC
[5/9] incubator-beam git commit: Implement windowed side inputs for
InProcess runner.
Implement windowed side inputs for InProcess runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7386bcca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7386bcca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7386bcca
Branch: refs/heads/python-sdk
Commit: 7386bcca10d1e7905ea65ba68a53e78b6185d876
Parents: 66b4c2f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Oct 12 17:37:52 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700
----------------------------------------------------------------------
.../inprocess/inprocess_evaluation_context.py | 20 +-------------------
1 file changed, 1 insertion(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7386bcca/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index 883be99..c6bd41f 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -106,25 +106,7 @@ class _InProcessSideInputsContainer(object):
Raises:
ValueError: If values cannot be converted into the requested form.
"""
- if isinstance(view, SingletonPCollectionView):
- if len(values) == 0:
- # pylint: disable=protected-access
- result = view._view_options().get('default', EmptySideInput())
- elif len(values) == 1:
- result = values[0].value
- else:
- raise ValueError(
- ('PCollection with more than one element accessed as '
- 'a singleton view: %s.') % view)
- elif isinstance(view, IterablePCollectionView):
- result = [v.value for v in values]
- elif isinstance(view, ListPCollectionView):
- result = [v.value for v in values]
- elif isinstance(view, DictPCollectionView):
- result = dict(v.value for v in values)
- else:
- raise NotImplementedError
- return result
+ return sideinputs.SideInputMap(type(view), view._view_options(), values)
class InProcessEvaluationContext(object):