You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/15 23:13:07 UTC

[1/2] incubator-beam git commit: Add GatherAllPanes PTransform

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0bb4f9c1e -> d440d9443


Add GatherAllPanes PTransform

This PTransform Reifies and gathers all panes produced for each
window, outputting a single pane per window at the time the window is
Garbage Collected.

For use in PAssert, to match over the final contents of each window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9daf999b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9daf999b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9daf999b

Branch: refs/heads/master
Commit: 9daf999bf966e62077a0b4ec37b97ab93e1d2bfc
Parents: 0bb4f9c
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 30 17:07:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 13:58:14 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/GatherAllPanes.java    |  74 +++++++++++
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |   7 +-
 .../beam/sdk/util/GatherAllPanesTest.java       | 132 +++++++++++++++++++
 3 files changed, 210 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
new file mode 100644
index 0000000..958d710
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Gathers all panes of each window into exactly one output.
+ *
+ * <p>
+ * Note that this will delay the output of a window until the garbage collection time (when the
+ * watermark passes the end of the window plus allowed lateness) even if the upstream triggers
+ * closed the window earlier.
+ */
+public class GatherAllPanes<T>
+    extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>> {
+  /**
+   * Gathers all panes of each window into a single output element.
+   *
+   * <p>
+   * This will gather all output panes into a single element, which causes them to be colocated on a
+   * single worker. As a result, this is only suitable for {@link PCollection PCollections} where
+   * all of the output elements for each pane fit in memory, such as in tests.
+   */
+  public static <T> GatherAllPanes<T> globally() {
+    return new GatherAllPanes<>();
+  }
+
+  private GatherAllPanes() {}
+
+  @Override
+  public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
+    WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
+
+    return input
+        .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+        .apply(new ReifyTimestampsAndWindows<Void, T>())
+        .apply(
+            Window.into(
+                    new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+                        originalWindowFn.windowCoder(),
+                        input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+                .triggering(Never.ever()))
+        // all values have the same key so they all appear as a single output element
+        .apply(GroupByKey.<Void, WindowedValue<T>>create())
+        .apply(Values.<Iterable<WindowedValue<T>>>create())
+        .setWindowingStrategyInternal(input.getWindowingStrategy());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index d77dd2d..65fc52d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -126,8 +126,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
   }
 
   /**
-   * Helper transform that makes timestamps and window assignments
-   * explicit in the value part of each key/value pair.
+   * Helper transform that makes timestamps and window assignments explicit in the value part of
+   * each key/value pair.
    */
   public static class ReifyTimestampsAndWindows<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
@@ -137,7 +137,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
 
       // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
       // to this implementation of GBK. All runners need a way to get the key.
-      checkArgument(input.getCoder() instanceof KvCoder,
+      checkArgument(
+          input.getCoder() instanceof KvCoder,
           "%s requires its input to use a %s",
           GroupByKey.class.getSimpleName(),
           KvCoder.class.getSimpleName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9daf999b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
new file mode 100644
index 0000000..553d589
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link GatherAllPanes}.
+ */
+@RunWith(JUnit4.class)
+public class GatherAllPanesTest implements Serializable {
+  @Test
+  public void singlePaneSingleReifiedPane() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
+        p.apply(CountingInput.upTo(20000))
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<Long, Instant>() {
+                      @Override
+                      public Instant apply(Long input) {
+                        return new Instant(input * 10);
+                      }
+                    }))
+            .apply(
+                Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .triggering(AfterWatermark.pastEndOfWindow())
+                    .withAllowedLateness(Duration.ZERO)
+                    .discardingFiredPanes())
+            .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+            .apply(GroupByKey.<Void, Long>create())
+            .apply(Values.<Iterable<Long>>create())
+            .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+    PAssert.that(accumulatedPanes)
+        .satisfies(
+            new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
+              @Override
+              public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
+                for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
+                  if (Iterables.size(windowedInput) > 1) {
+                    fail("Expected all windows to have exactly one pane, got " + windowedInput);
+                    return null;
+                  }
+                }
+                return null;
+              }
+            });
+  }
+
+  @Test
+  public void multiplePanesMultipleReifiedPane() {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
+        p.apply(CountingInput.upTo(20000))
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<Long, Instant>() {
+                      @Override
+                      public Instant apply(Long input) {
+                        return new Instant(input * 10);
+                      }
+                    }))
+            .apply(
+                Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+                    .withAllowedLateness(Duration.ZERO)
+                    .discardingFiredPanes())
+            .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
+            .apply(GroupByKey.<Void, Long>create())
+            .apply(Values.<Iterable<Long>>create())
+            .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+    PAssert.that(accumulatedPanes)
+        .satisfies(
+            new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
+              @Override
+              public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
+                for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
+                  if (Iterables.size(windowedInput) > 1) {
+                    return null;
+                  }
+                }
+                fail("Expected at least one window to have multiple panes");
+                return null;
+              }
+            });
+  }
+}


[2/2] incubator-beam git commit: This closes #180

Posted by bc...@apache.org.
This closes #180


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d440d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d440d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d440d944

Branch: refs/heads/master
Commit: d440d94434542c9bed3f095e0b23c1a889cd5852
Parents: 0bb4f9c 9daf999
Author: bchambers <bc...@google.com>
Authored: Fri Apr 15 13:58:18 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 13:58:18 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/GatherAllPanes.java    |  74 +++++++++++
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |   7 +-
 .../beam/sdk/util/GatherAllPanesTest.java       | 132 +++++++++++++++++++
 3 files changed, 210 insertions(+), 3 deletions(-)
----------------------------------------------------------------------