You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:55 UTC

[5/9] incubator-beam git commit: Implement windowed side inputs for InProcess runner.

Implement windowed side inputs for InProcess runner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7386bcca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7386bcca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7386bcca

Branch: refs/heads/python-sdk
Commit: 7386bcca10d1e7905ea65ba68a53e78b6185d876
Parents: 66b4c2f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Oct 12 17:37:52 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700

----------------------------------------------------------------------
 .../inprocess/inprocess_evaluation_context.py   | 20 +-------------------
 1 file changed, 1 insertion(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7386bcca/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index 883be99..c6bd41f 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -106,25 +106,7 @@ class _InProcessSideInputsContainer(object):
     Raises:
       ValueError: If values cannot be converted into the requested form.
     """
-    if isinstance(view, SingletonPCollectionView):
-      if len(values) == 0:
-        # pylint: disable=protected-access
-        result = view._view_options().get('default', EmptySideInput())
-      elif len(values) == 1:
-        result = values[0].value
-      else:
-        raise ValueError(
-            ('PCollection with more than one element accessed as '
-             'a singleton view: %s.') % view)
-    elif isinstance(view, IterablePCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, ListPCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, DictPCollectionView):
-      result = dict(v.value for v in values)
-    else:
-      raise NotImplementedError
-    return result
+    return sideinputs.SideInputMap(type(view), view._view_options(), values)
 
 
 class InProcessEvaluationContext(object):