You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/15 00:23:04 UTC
[1/2] incubator-beam git commit: Make IdentityWindowFn and
NeverTrigger available
Repository: incubator-beam
Updated Branches:
refs/heads/master 6511ba28e -> 5bdea1e2b
Make IdentityWindowFn and NeverTrigger available
This will be used as part of the new PAssert
IdentityWindowFn remains package-private to restrict usage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9a1efae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9a1efae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9a1efae
Branch: refs/heads/master
Commit: c9a1efae25455af1e47675938e296c716ec0fa0f
Parents: 6511ba2
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 31 14:34:06 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Apr 14 15:09:47 2016 -0700
----------------------------------------------------------------------
.../transforms/windowing/AfterWatermark.java | 46 +-------
.../beam/sdk/transforms/windowing/Never.java | 76 ++++++++++++
.../apache/beam/sdk/util/IdentityWindowFn.java | 116 +++++++++++++++++++
.../org/apache/beam/sdk/util/Reshuffle.java | 80 ++-----------
.../sdk/transforms/windowing/NeverTest.java | 56 +++++++++
5 files changed, 259 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 5aca093..05c6eb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.sdk.util.ExecutableTrigger;
import org.apache.beam.sdk.util.TimeDomain;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
@@ -96,43 +95,6 @@ public class AfterWatermark {
TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger);
}
- /**
- * A trigger which never fires. Used for the "early" trigger when only a late trigger was
- * specified.
- */
- private static class NeverTrigger extends OnceTrigger {
-
- protected NeverTrigger() {
- super(null);
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception { }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception { }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
- throw new UnsupportedOperationException(
- String.format("%s should never fire", getClass().getSimpleName()));
- }
- }
private static class AfterWatermarkEarlyAndLate
extends Trigger
@@ -314,8 +276,7 @@ public class AfterWatermark {
* the given {@code Trigger} fires before the watermark has passed the end of the window.
*/
public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) {
- Preconditions.checkNotNull(earlyFirings,
- "Must specify the trigger to use for early firings");
+ checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
return new AfterWatermarkEarlyAndLate(earlyFirings, null);
}
@@ -324,9 +285,8 @@ public class AfterWatermark {
* the given {@code Trigger} fires after the watermark has passed the end of the window.
*/
public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) {
- Preconditions.checkNotNull(lateFirings,
- "Must specify the trigger to use for late firings");
- return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings);
+ checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
+ return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
new file mode 100644
index 0000000..809e841
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * A trigger which never fires.
+ *
+ * <p>
+ * Using this trigger will only produce output when the watermark passes the end of the
+ * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness() allowed
+ * lateness}.
+ */
+public final class Never {
+ /**
+ * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
+ * when the {@link BoundedWindow} closes.
+ */
+ public static OnceTrigger ever() {
+ // NeverTrigger ignores all inputs and is Window-type independent.
+ return new NeverTrigger();
+ }
+
+ private static class NeverTrigger extends OnceTrigger {
+ protected NeverTrigger() {
+ super(null);
+ }
+
+ @Override
+ public void onElement(OnElementContext c) {}
+
+ @Override
+ public void onMerge(OnMergeContext c) {}
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) {
+ return false;
+ }
+
+ @Override
+ protected void onOnlyFiring(Trigger.TriggerContext context) {
+ throw new UnsupportedOperationException(
+ String.format("%s should never fire", getClass().getSimpleName()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
new file mode 100644
index 0000000..91e5609
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
+ *
+ * <p>This {@link WindowFn} is applied when elements must be passed through a {@link GroupByKey},
+ * but should maintain their existing {@link Window} assignments. Because windows may have been
+ * merged, the earlier {@link WindowFn} may not appropriately maintain the existing window
+ * assignments. For example, if the earlier {@link WindowFn} merges windows, after a
+ * {@link GroupByKey} the {@link WindowingStrategy} uses {@link InvalidWindows}, and no further
+ * {@link GroupByKey} can be applied without applying a new {@link WindowFn}. This {@link WindowFn}
+ * allows existing window assignments to be maintained across a single group by key, at which point
+ * the earlier {@link WindowingStrategy} should be restored.
+ *
+ * <p>This {@link WindowFn} is an internal implementation detail of sdk-provided utilities, and
+ * should not be used by {@link Pipeline} writers.
+ */
+class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
+
+ /**
+ * The coder of the type of windows of the input {@link PCollection}. This is not an arbitrary
+ * {@link BoundedWindow} {@link Coder}, but is safe to use for all windows assigned by this
+ * transform, as it should be the same coder used by the {@link WindowFn} that initially assigned
+ * these windows.
+ */
+ private final Coder<BoundedWindow> coder;
+ private final boolean assignsToSingleWindow;
+
+ public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow) {
+ // Safe because it is only used privately here.
+ // At every point where a window is returned or accepted, it has been provided
+ // by priorWindowFn, so it is of the expected type.
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder;
+ this.coder = windowCoder;
+ this.assignsToSingleWindow = assignsToSingleWindow;
+ }
+
+ @Override
+ public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c)
+ throws Exception {
+ // The windows are provided by priorWindowFn, which also provides the coder for them
+ @SuppressWarnings("unchecked")
+ Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
+ return priorWindows;
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.isCompatible() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public Coder<BoundedWindow> windowCoder() {
+ // Safe because the previous WindowFn provides both the windows and the coder.
+ // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
+ return coder;
+ }
+
+ @Override
+ public boolean assignsToSingleWindow() {
+ return assignsToSingleWindow;
+ }
+
+ @Override
+ public BoundedWindow getSideInputWindow(BoundedWindow window) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.getSideInputWindow() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Deprecated
+ @Override
+ public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
+ return inputTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 09b2222..5c91326 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,22 +17,15 @@
*/
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.Collection;
/**
* A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally
@@ -62,11 +55,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- Window.Bound<KV<K, V>> rewindow = Window
- .<KV<K, V>>into(new PassThroughWindowFn<>(originalStrategy.getWindowFn()))
- .triggering(new ReshuffleTrigger<>())
- .discardingFiredPanes()
- .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ Window.Bound<KV<K, V>> rewindow =
+ Window.<KV<K, V>>into(
+ new IdentityWindowFn<>(
+ originalStrategy.getWindowFn().windowCoder(),
+ originalStrategy.getWindowFn().assignsToSingleWindow()))
+ .triggering(new ReshuffleTrigger<>())
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
return input.apply(rewindow)
.apply(GroupByKey.<K, V>create())
@@ -84,64 +80,4 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
}
}));
}
-
- /**
- * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
- *
- * <p>In order to implement all the abstract methods of {@link WindowFn}, this requires the
- * prior {@link WindowFn}, to which all auxiliary functionality is delegated.
- */
- private static class PassThroughWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
-
- /** The WindowFn prior to this. Used for its windowCoder, etc. */
- private final WindowFn<?, BoundedWindow> priorWindowFn;
-
- public PassThroughWindowFn(WindowFn<?, ?> priorWindowFn) {
- // Safe because it is only used privately here.
- // At every point where a window is returned or accepted, it has been provided
- // by priorWindowFn, so it is of the type expected.
- @SuppressWarnings("unchecked")
- WindowFn<?, BoundedWindow> internalWindowFn = (WindowFn<?, BoundedWindow>) priorWindowFn;
- this.priorWindowFn = internalWindowFn;
- }
-
- @Override
- public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c)
- throws Exception {
- // The windows are provided by priorWindowFn, which also provides the coder for them
- @SuppressWarnings("unchecked")
- Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
- return priorWindows;
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- throw new UnsupportedOperationException(
- String.format("%s.isCompatible() should never be called."
- + " It is a private implementation detail of Reshuffle."
- + " This message indicates a bug in the Dataflow SDK.",
- getClass().getCanonicalName()));
- }
-
- @Override
- public Coder<BoundedWindow> windowCoder() {
- // Safe because priorWindowFn provides the windows also.
- // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
- return priorWindowFn.windowCoder();
- }
-
- @Override
- public BoundedWindow getSideInputWindow(BoundedWindow window) {
- throw new UnsupportedOperationException(
- String.format("%s.getSideInputWindow() should never be called."
- + " It is a private implementation detail of Reshuffle."
- + " This message indicates a bug in the Dataflow SDK.",
- getClass().getCanonicalName()));
- }
-
- @Override
- public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
new file mode 100644
index 0000000..222fe4e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+ private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+ @Before
+ public void setup() throws Exception {
+ triggerTester =
+ TriggerTester.forTrigger(
+ Never.<IntervalWindow>ever(), FixedWindows.of(Duration.standardMinutes(5)));
+ }
+
+ @Test
+ public void falseAfterEndOfWindow() throws Exception {
+ triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
+ assertThat(triggerTester.shouldFire(window), is(false));
+ triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ assertThat(triggerTester.shouldFire(window), is(false));
+ }
+}
[2/2] incubator-beam git commit: This closes #150
Posted by bc...@apache.org.
This closes #150
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bdea1e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bdea1e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bdea1e2
Branch: refs/heads/master
Commit: 5bdea1e2bfce7b34a877f198026065b08b36b760
Parents: 6511ba2 c9a1efa
Author: bchambers <bc...@google.com>
Authored: Thu Apr 14 15:09:54 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Apr 14 15:09:54 2016 -0700
----------------------------------------------------------------------
.../transforms/windowing/AfterWatermark.java | 46 +-------
.../beam/sdk/transforms/windowing/Never.java | 76 ++++++++++++
.../apache/beam/sdk/util/IdentityWindowFn.java | 116 +++++++++++++++++++
.../org/apache/beam/sdk/util/Reshuffle.java | 80 ++-----------
.../sdk/transforms/windowing/NeverTest.java | 56 +++++++++
5 files changed, 259 insertions(+), 115 deletions(-)
----------------------------------------------------------------------