You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/17 03:24:00 UTC

[1/2] incubator-beam git commit: [BEAM-80] Enable combiner lifting for combine with contexts

Repository: incubator-beam
Updated Branches:
  refs/heads/master 6ba288d67 -> c199f0854


[BEAM-80] Enable combiner lifting for combine with contexts


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06b18fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06b18fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06b18fde

Branch: refs/heads/master
Commit: 06b18fde6ec2d092a9733e5bfcfa63de3cf00833
Parents: b2b5f42
Author: Pei He <pe...@gmail.com>
Authored: Thu Mar 10 14:17:36 2016 -0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 10 14:17:36 2016 -0800

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java   |  3 +++
 .../cloud/dataflow/sdk/transforms/Combine.java    | 18 +++---------------
 .../cloud/dataflow/sdk/util/PropertyNames.java    |  1 +
 3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index d0cc4e5..0feae95 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -952,6 +952,9 @@ public class DataflowPipelineTranslator {
             context.addInput(
                 PropertyNames.SERIALIZED_FN,
                 byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+            context.addInput(
+                PropertyNames.IS_MERGING_WINDOW_FN,
+                !windowingStrategy.getWindowFn().isNonMerging());
           }
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
index cc0347a..b8d20e3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
@@ -1690,21 +1690,9 @@ public class Combine {
 
     @Override
     public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
-      if (fn instanceof RequiresContextInternal) {
-        return input
-            .apply(GroupByKey.<K, InputT>create(fewKeys))
-            .apply(ParDo.of(new DoFn<KV<K, Iterable<InputT>>, KV<K, Iterable<InputT>>>() {
-              @Override
-              public void processElement(ProcessContext c) throws Exception {
-                c.output(c.element());
-              }
-            }))
-            .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
-      } else {
-        return input
-            .apply(GroupByKey.<K, InputT>create(fewKeys))
-            .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
-      }
+      return input
+          .apply(GroupByKey.<K, InputT>create(fewKeys))
+          .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index 5611fab..ec65189 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -65,6 +65,7 @@ public class PropertyNames {
   public static final String INPUTS = "inputs";
   public static final String INPUT_CODER = "input_coder";
   public static final String IS_GENERATED = "is_generated";
+  public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
   public static final String IS_PAIR_LIKE = "is_pair_like";
   public static final String IS_STREAM_LIKE = "is_stream_like";
   public static final String IS_WRAPPER = "is_wrapper";


[2/2] incubator-beam git commit: This closes #39

Posted by ke...@apache.org.
This closes #39


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c199f085
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c199f085
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c199f085

Branch: refs/heads/master
Commit: c199f085473cfcd79014d0a022b5ce3fdd4863ec
Parents: 6ba288d 06b18fd
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Mar 16 19:23:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Mar 16 19:23:48 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java   |  3 +++
 .../cloud/dataflow/sdk/transforms/Combine.java    | 18 +++---------------
 .../cloud/dataflow/sdk/util/PropertyNames.java    |  1 +
 3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------