You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/15 23:13:07 UTC
[1/2] incubator-beam git commit: Add GatherAllPanes PTransform
Repository: incubator-beam
Updated Branches:
refs/heads/master 0bb4f9c1e -> d440d9443
Add GatherAllPanes PTransform
This PTransform Reifies and gathers all panes produced for each
window, outputting a single pane per window at the time the window is
Garbage Collected.
For use in PAssert, to match over the final contents of each window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9daf999b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9daf999b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9daf999b
Branch: refs/heads/master
Commit: 9daf999bf966e62077a0b4ec37b97ab93e1d2bfc
Parents: 0bb4f9c
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 30 17:07:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 13:58:14 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/GatherAllPanes.java | 74 +++++++++++
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 7 +-
.../beam/sdk/util/GatherAllPanesTest.java | 132 +++++++++++++++++++
3 files changed, 210 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
new file mode 100644
index 0000000..958d710
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Gathers all panes of each window into exactly one output.
+ *
+ * <p>
+ * Note that this will delay the output of a window until the garbage collection time (when the
+ * watermark passes the end of the window plus allowed lateness) even if the upstream triggers
+ * closed the window earlier.
+ */
+public class GatherAllPanes<T>
+ extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>> {
+ /**
+ * Gathers all panes of each window into a single output element.
+ *
+ * <p>
+ * This will gather all output panes into a single element, which causes them to be colocated on a
+ * single worker. As a result, this is only suitable for {@link PCollection PCollections} where
+ * all of the output elements for each pane fit in memory, such as in tests.
+ */
+ public static <T> GatherAllPanes<T> globally() {
+ return new GatherAllPanes<>();
+ }
+
+ private GatherAllPanes() {}
+
+ @Override
+ public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
+ WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
+
+ return input
+ .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+ .apply(new ReifyTimestampsAndWindows<Void, T>())
+ .apply(
+ Window.into(
+ new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+ originalWindowFn.windowCoder(),
+ input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+ .triggering(Never.ever()))
+ // all values have the same key so they all appear as a single output element
+ .apply(GroupByKey.<Void, WindowedValue<T>>create())
+ .apply(Values.<Iterable<WindowedValue<T>>>create())
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index d77dd2d..65fc52d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -126,8 +126,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
}
/**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
+ * Helper transform that makes timestamps and window assignments explicit in the value part of
+ * each key/value pair.
*/
public static class ReifyTimestampsAndWindows<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
@@ -137,7 +137,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
// The requirement to use a KvCoder *is* actually a model-level requirement, not specific
// to this implementation of GBK. All runners need a way to get the key.
- checkArgument(input.getCoder() instanceof KvCoder,
+ checkArgument(
+ input.getCoder() instanceof KvCoder,
"%s requires its input to use a %s",
GroupByKey.class.getSimpleName(),
KvCoder.class.getSimpleName());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
new file mode 100644
index 0000000..553d589
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link GatherAllPanes}.
+ */
+@RunWith(JUnit4.class)
+public class GatherAllPanesTest implements Serializable {
+ @Test
+ public void singlePaneSingleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
+ p.apply(CountingInput.upTo(20000))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+ .apply(GroupByKey.<Void, Long>create())
+ .apply(Values.<Iterable<Long>>create())
+ .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
+ @Override
+ public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
+ for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ fail("Expected all windows to have exactly one pane, got " + windowedInput);
+ return null;
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void multiplePanesMultipleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
+ p.apply(CountingInput.upTo(20000))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+ .apply(GroupByKey.<Void, Long>create())
+ .apply(Values.<Iterable<Long>>create())
+ .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
+ @Override
+ public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
+ for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ return null;
+ }
+ }
+ fail("Expected at least one window to have multiple panes");
+ return null;
+ }
+ });
+ }
+}
[2/2] incubator-beam git commit: This closes #180
Posted by bc...@apache.org.
This closes #180
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d440d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d440d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d440d944
Branch: refs/heads/master
Commit: d440d94434542c9bed3f095e0b23c1a889cd5852
Parents: 0bb4f9c 9daf999
Author: bchambers <bc...@google.com>
Authored: Fri Apr 15 13:58:18 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 13:58:18 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/GatherAllPanes.java | 74 +++++++++++
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 7 +-
.../beam/sdk/util/GatherAllPanesTest.java | 132 +++++++++++++++++++
3 files changed, 210 insertions(+), 3 deletions(-)
----------------------------------------------------------------------