You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/17 03:24:00 UTC
[1/2] incubator-beam git commit: [BEAM-80] Enable combiner lifting
for combine with contexts
Repository: incubator-beam
Updated Branches:
refs/heads/master 6ba288d67 -> c199f0854
[BEAM-80] Enable combiner lifting for combine with contexts
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06b18fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06b18fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06b18fde
Branch: refs/heads/master
Commit: 06b18fde6ec2d092a9733e5bfcfa63de3cf00833
Parents: b2b5f42
Author: Pei He <pe...@gmail.com>
Authored: Thu Mar 10 14:17:36 2016 -0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 10 14:17:36 2016 -0800
----------------------------------------------------------------------
.../sdk/runners/DataflowPipelineTranslator.java | 3 +++
.../cloud/dataflow/sdk/transforms/Combine.java | 18 +++---------------
.../cloud/dataflow/sdk/util/PropertyNames.java | 1 +
3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index d0cc4e5..0feae95 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -952,6 +952,9 @@ public class DataflowPipelineTranslator {
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+ context.addInput(
+ PropertyNames.IS_MERGING_WINDOW_FN,
+ !windowingStrategy.getWindowFn().isNonMerging());
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
index cc0347a..b8d20e3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
@@ -1690,21 +1690,9 @@ public class Combine {
@Override
public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) {
- if (fn instanceof RequiresContextInternal) {
- return input
- .apply(GroupByKey.<K, InputT>create(fewKeys))
- .apply(ParDo.of(new DoFn<KV<K, Iterable<InputT>>, KV<K, Iterable<InputT>>>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- }))
- .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
- } else {
- return input
- .apply(GroupByKey.<K, InputT>create(fewKeys))
- .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
- }
+ return input
+ .apply(GroupByKey.<K, InputT>create(fewKeys))
+ .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index 5611fab..ec65189 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -65,6 +65,7 @@ public class PropertyNames {
public static final String INPUTS = "inputs";
public static final String INPUT_CODER = "input_coder";
public static final String IS_GENERATED = "is_generated";
+ public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
public static final String IS_PAIR_LIKE = "is_pair_like";
public static final String IS_STREAM_LIKE = "is_stream_like";
public static final String IS_WRAPPER = "is_wrapper";
[2/2] incubator-beam git commit: This closes #39
Posted by ke...@apache.org.
This closes #39
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c199f085
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c199f085
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c199f085
Branch: refs/heads/master
Commit: c199f085473cfcd79014d0a022b5ce3fdd4863ec
Parents: 6ba288d 06b18fd
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Mar 16 19:23:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Mar 16 19:23:48 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/DataflowPipelineTranslator.java | 3 +++
.../cloud/dataflow/sdk/transforms/Combine.java | 18 +++---------------
.../cloud/dataflow/sdk/util/PropertyNames.java | 1 +
3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------