You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:07 UTC
[11/50] [abbrv] beam git commit: [BEAM-1502] GroupByKey should not
return bare lists in DirectRunner.
[BEAM-1502] GroupByKey should not return bare lists in DirectRunner.
This leads to invalidated expectations on other runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7059e5c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7059e5c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7059e5c
Branch: refs/heads/DSL_SQL
Commit: e7059e5cb3cd07855582641798c58fc3cf5cd682
Parents: 532256e
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 17 13:44:40 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 17 15:08:02 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 2 +-
sdks/python/apache_beam/transforms/trigger.py | 21 +++++++++++++++-----
3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3a5f9b1..27b8120 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1136,7 +1136,7 @@ def model_group_by_key(contents, output_path):
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(grouped_words
- | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
+ | 'count words' >> beam.Map(lambda (word, counts): (word, sum(counts)))
| beam.io.WriteToText(output_path))
http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8018219..92b8737 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1017,7 +1017,7 @@ class CombineValuesDoFn(DoFn):
self.combinefn.apply(element[1], *args, **kwargs))]
# Add the elements into three accumulators (for testing of merge).
- elements = element[1]
+ elements = list(element[1])
accumulators = []
for k in range(3):
if len(elements) <= k:
http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index f77fa1a..c1fbfc5 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -24,6 +24,7 @@ from abc import ABCMeta
from abc import abstractmethod
import collections
import copy
+import itertools
from apache_beam.coders import observable
from apache_beam.transforms import combiners
@@ -878,6 +879,17 @@ class _UnwindowedValues(observable.ObservableMixin):
def __reduce__(self):
return list, (list(self),)
+ def __eq__(self, other):
+ if isinstance(other, collections.Iterable):
+ return all(
+ a == b
+ for a, b in itertools.izip_longest(self, other, fillvalue=object()))
+ else:
+ return NotImplemented
+
+ def __ne__(self, other):
+ return not self == other
+
class DefaultGlobalBatchTriggerDriver(TriggerDriver):
"""Breaks a bundles into window (pane)s according to the default triggering.
@@ -888,11 +900,10 @@ class DefaultGlobalBatchTriggerDriver(TriggerDriver):
pass
def process_elements(self, state, windowed_values, unused_output_watermark):
- if isinstance(windowed_values, list):
- unwindowed = [wv.value for wv in windowed_values]
- else:
- unwindowed = _UnwindowedValues(windowed_values)
- yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE)
+ yield WindowedValue(
+ _UnwindowedValues(windowed_values),
+ MIN_TIMESTAMP,
+ self.GLOBAL_WINDOW_TUPLE)
def process_timer(self, window_id, name, time_domain, timestamp, state):
raise TypeError('Triggers never set or called for batch default windowing.')