You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/03/16 22:47:31 UTC

[beam] branch release-2.20.0 updated (3135924 -> 92706a9)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a change to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 3135924  Merge pull request #11117 [BEAM-9472] Remove excessive logging.
     new 8414c30  [BEAM-9465] Fire repeatedly in reshuffle
     new 92706a9  [BEAM-9485] Raise error when transform urn is not implemented

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 ++-
 sdks/python/apache_beam/transforms/util.py                  | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)


[beam] 02/02: [BEAM-9485] Raise error when transform urn is not implemented

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 92706a9f5d8b9212c77da021561b843e9899d9a6
Author: Ankur Goenka <an...@gmail.com>
AuthorDate: Tue Mar 10 23:45:36 2020 -0700

    [BEAM-9485] Raise error when transform urn is not implemented
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ac39cb9..6e258d0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -849,7 +849,8 @@ class DataflowRunner(PipelineRunner):
     if common_urns.primitives.PAR_DO.urn == urn:
       self.run_ParDo(transform_node, options)
     else:
-      NotImplementedError(urn)
+      raise NotImplementedError(
+          '%s uses unsupported URN: %s' % (transform_node.full_label, urn))
 
   def run_ParDo(self, transform_node, options):
     transform = transform_node.transform


[beam] 01/02: [BEAM-9465] Fire repeatedly in reshuffle

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8414c302f97e93fb0f5dd8195def77c35c463370
Author: Ankur Goenka <an...@gmail.com>
AuthorDate: Fri Mar 6 14:14:43 2020 -0800

    [BEAM-9465] Fire repeatedly in reshuffle
---
 sdks/python/apache_beam/transforms/util.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 361edbe..8fd3f72 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -64,6 +64,7 @@ from apache_beam.transforms.ptransform import ptransform_fn
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterCount
+from apache_beam.transforms.trigger import Repeatedly
 from apache_beam.transforms.userstate import BagStateSpec
 from apache_beam.transforms.userstate import CombiningValueStateSpec
 from apache_beam.transforms.userstate import TimerSpec
@@ -678,7 +679,7 @@ class ReshufflePerKey(PTransform):
     # accept only standard coders.
     ungrouped._windowing = Windowing(
         window.GlobalWindows(),
-        triggerfn=AfterCount(1),
+        triggerfn=Repeatedly(AfterCount(1)),
         accumulation_mode=AccumulationMode.DISCARDING,
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
     result = (