You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:40 UTC
[16/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
deleted file mode 100644
index 18f7a97..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * Provides information about the pane an element belongs to. Every pane is implicitly associated
- * with a window. Panes are observable only via the
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext#pane} method of the context
- * passed to a {@link DoFn#processElement} overridden method.
- *
- * <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
- */
-public final class PaneInfo {
- /**
- * Enumerates the possibilities for the timing of this pane firing related to the
- * input and output watermarks for its computation.
- *
- * <p>A window may fire multiple panes, and the timing of those panes generally follows the
- * regular expression {@code EARLY* ON_TIME? LATE*}. Generally a pane is considered:
- * <ol>
- * <li>{@code EARLY} if the system cannot be sure it has seen all data which may contribute
- * to the pane's window.
- * <li>{@code ON_TIME} if the system predicts it has seen all the data which may contribute
- * to the pane's window.
- * <li>{@code LATE} if the system has encountered new data after predicting no more could arrive.
- * It is possible an {@code ON_TIME} pane has already been emitted, in which case any
- * following panes are considered {@code LATE}.
- * </ol>
- *
- * <p>Only an
- * {@link AfterWatermark#pastEndOfWindow} trigger may produce an {@code ON_TIME} pane.
- * With merging {@link WindowFn}'s, windows may be merged to produce new windows that satisfy
- * their own instance of the above regular expression. The only guarantee is that once a window
- * produces a final pane, it will not be merged into any new windows.
- *
- * <p>The predictions above are made using the mechanism of watermarks.
- * See {@link com.google.cloud.dataflow.sdk.util.TimerInternals} for more information
- * about watermarks.
- *
- * <p>We can state some properties of {@code LATE} and {@code ON_TIME} panes, but first need some
- * definitions:
- * <ol>
- * <li>We'll call a pipeline 'simple' if it does not use
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#outputWithTimestamp} in
- * any {@code DoFn}, and it uses the same
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window.Bound#withAllowedLateness}
- * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}).
- * <li>We'll call an element 'locally late', from the point of view of a computation on a
- * worker, if the element's timestamp is before the input watermark for that computation
- * on that worker. The element is otherwise 'locally on-time'.
- * <li>We'll say 'the pane's timestamp' to mean the timestamp of the element produced to
- * represent the pane's contents.
- * </ol>
- *
- * <p>Then in simple pipelines:
- * <ol>
- * <li> (Soundness) An {@code ON_TIME} pane can never cause a later computation to generate a
- * {@code LATE} pane. (If it did, it would imply a later computation's input watermark progressed
- * ahead of an earlier stage's output watermark, which by design is not possible.)
- * <li> (Liveness) An {@code ON_TIME} pane is emitted as soon as possible after the input
- * watermark passes the end of the pane's window.
- * <li> (Consistency) A pane with only locally on-time elements will always be {@code ON_TIME}.
- * And a {@code LATE} pane cannot contain locally on-time elements.
- * </ol>
- *
- * However, note that:
- * <ol>
- * <li> An {@code ON_TIME} pane may contain locally late elements. It may even contain only
- * locally late elements. Provided a locally late element finds its way into an {@code ON_TIME}
- * pane its lateness becomes unobservable.
- * <li> A {@code LATE} pane does not necessarily cause any following computation panes to be
- * marked as {@code LATE}.
- * </ol>
- */
- public enum Timing {
- /**
- * Pane was fired before the input watermark had progressed after the end of the window.
- */
- EARLY,
- /**
- * Pane was fired by a {@link AfterWatermark#pastEndOfWindow} trigger because the input
- * watermark progressed after the end of the window. However the output watermark has not
- * yet progressed after the end of the window. Thus it is still possible to assign a timestamp
- * to the element representing this pane which cannot be considered locally late by any
- * following computation.
- */
- ON_TIME,
- /**
- * Pane was fired after the output watermark had progressed past the end of the window.
- */
- LATE,
- /**
- * This element was not produced in a triggered pane and its relation to input and
- * output watermarks is unknown.
- */
- UNKNOWN;
-
- // NOTE: Do not add fields or re-order them. The ordinal is used as part of
- // the encoding.
- }
-
- private static byte encodedByte(boolean isFirst, boolean isLast, Timing timing) {
- byte result = 0x0;
- if (isFirst) {
- result |= 1;
- }
- if (isLast) {
- result |= 2;
- }
- result |= timing.ordinal() << 2;
- return result;
- }
-
- private static final ImmutableMap<Byte, PaneInfo> BYTE_TO_PANE_INFO;
- static {
- ImmutableMap.Builder<Byte, PaneInfo> decodingBuilder = ImmutableMap.builder();
- for (Timing timing : Timing.values()) {
- long onTimeIndex = timing == Timing.EARLY ? -1 : 0;
- register(decodingBuilder, new PaneInfo(true, true, timing, 0, onTimeIndex));
- register(decodingBuilder, new PaneInfo(true, false, timing, 0, onTimeIndex));
- register(decodingBuilder, new PaneInfo(false, true, timing, -1, onTimeIndex));
- register(decodingBuilder, new PaneInfo(false, false, timing, -1, onTimeIndex));
- }
- BYTE_TO_PANE_INFO = decodingBuilder.build();
- }
-
- private static void register(ImmutableMap.Builder<Byte, PaneInfo> builder, PaneInfo info) {
- builder.put(info.encodedByte, info);
- }
-
- private final byte encodedByte;
-
- private final boolean isFirst;
- private final boolean isLast;
- private final Timing timing;
- private final long index;
- private final long nonSpeculativeIndex;
-
- /**
- * {@code PaneInfo} to use for elements on (and before) initial window assignemnt (including
- * elements read from sources) before they have passed through a {@link GroupByKey} and are
- * associated with a particular trigger firing.
- */
- public static final PaneInfo NO_FIRING =
- PaneInfo.createPane(true, true, Timing.UNKNOWN, 0, 0);
-
- /**
- * {@code PaneInfo} to use when there will be exactly one firing and it is on time.
- */
- public static final PaneInfo ON_TIME_AND_ONLY_FIRING =
- PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0);
-
- private PaneInfo(boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) {
- this.encodedByte = encodedByte(isFirst, isLast, timing);
- this.isFirst = isFirst;
- this.isLast = isLast;
- this.timing = timing;
- this.index = index;
- this.nonSpeculativeIndex = onTimeIndex;
- }
-
- public static PaneInfo createPane(boolean isFirst, boolean isLast, Timing timing) {
- Preconditions.checkArgument(isFirst, "Indices must be provided for non-first pane info.");
- return createPane(isFirst, isLast, timing, 0, timing == Timing.EARLY ? -1 : 0);
- }
-
- /**
- * Factory method to create a {@link PaneInfo} with the specified parameters.
- */
- public static PaneInfo createPane(
- boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) {
- if (isFirst || timing == Timing.UNKNOWN) {
- return Preconditions.checkNotNull(
- BYTE_TO_PANE_INFO.get(encodedByte(isFirst, isLast, timing)));
- } else {
- return new PaneInfo(isFirst, isLast, timing, index, onTimeIndex);
- }
- }
-
- public static PaneInfo decodePane(byte encodedPane) {
- return Preconditions.checkNotNull(BYTE_TO_PANE_INFO.get(encodedPane));
- }
-
- /**
- * Return true if there is no timing information for the current {@link PaneInfo}.
- * This typically indicates that the current element has not been assigned to
- * windows or passed through an operation that executes triggers yet.
- */
- public boolean isUnknown() {
- return Timing.UNKNOWN.equals(timing);
- }
-
- /**
- * Return true if this is the first pane produced for the associated window.
- */
- public boolean isFirst() {
- return isFirst;
- }
-
- /**
- * Return true if this is the last pane that will be produced in the associated window.
- */
- public boolean isLast() {
- return isLast;
- }
-
- /**
- * Return true if this is the last pane that will be produced in the associated window.
- */
- public Timing getTiming() {
- return timing;
- }
-
- /**
- * The zero-based index of this trigger firing that produced this pane.
- *
- * <p>This will return 0 for the first time the timer fires, 1 for the next time, etc.
- *
- * <p>A given (key, window, pane-index) is guaranteed to be unique in the
- * output of a group-by-key operation.
- */
- public long getIndex() {
- return index;
- }
-
- /**
- * The zero-based index of this trigger firing among non-speculative panes.
- *
- * <p> This will return 0 for the first non-{@link Timing#EARLY} timer firing, 1 for the next one,
- * etc.
- *
- * <p>Always -1 for speculative data.
- */
- public long getNonSpeculativeIndex() {
- return nonSpeculativeIndex;
- }
-
- int getEncodedByte() {
- return encodedByte;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(encodedByte, index, nonSpeculativeIndex);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- // Simple PaneInfos are interned.
- return true;
- } else if (obj instanceof PaneInfo) {
- PaneInfo that = (PaneInfo) obj;
- return this.encodedByte == that.encodedByte
- && this.index == that.index
- && this.nonSpeculativeIndex == that.nonSpeculativeIndex;
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .omitNullValues()
- .add("isFirst", isFirst ? true : null)
- .add("isLast", isLast ? true : null)
- .add("timing", timing)
- .add("index", index)
- .add("onTimeIndex", nonSpeculativeIndex != -1 ? nonSpeculativeIndex : null)
- .toString();
- }
-
- /**
- * A Coder for encoding PaneInfo instances.
- */
- public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
- private static enum Encoding {
- FIRST,
- ONE_INDEX,
- TWO_INDICES;
-
- // NOTE: Do not reorder fields. The ordinal is used as part of
- // the encoding.
-
- public final byte tag;
-
- private Encoding() {
- assert ordinal() < 16;
- tag = (byte) (ordinal() << 4);
- }
-
- public static Encoding fromTag(byte b) {
- return Encoding.values()[b >> 4];
- }
- }
-
- private Encoding chooseEncoding(PaneInfo value) {
- if (value.index == 0 && value.nonSpeculativeIndex == 0 || value.timing == Timing.UNKNOWN) {
- return Encoding.FIRST;
- } else if (value.index == value.nonSpeculativeIndex || value.timing == Timing.EARLY) {
- return Encoding.ONE_INDEX;
- } else {
- return Encoding.TWO_INDICES;
- }
- }
-
- public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
-
- @Override
- public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- Encoding encoding = chooseEncoding(value);
- switch (chooseEncoding(value)) {
- case FIRST:
- outStream.write(value.encodedByte);
- break;
- case ONE_INDEX:
- outStream.write(value.encodedByte | encoding.tag);
- VarInt.encode(value.index, outStream);
- break;
- case TWO_INDICES:
- outStream.write(value.encodedByte | encoding.tag);
- VarInt.encode(value.index, outStream);
- VarInt.encode(value.nonSpeculativeIndex, outStream);
- break;
- default:
- throw new CoderException("Unknown encoding " + encoding);
- }
- }
-
- @Override
- public PaneInfo decode(final InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- byte keyAndTag = (byte) inStream.read();
- PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F));
- long index, onTimeIndex;
- switch (Encoding.fromTag(keyAndTag)) {
- case FIRST:
- return base;
- case ONE_INDEX:
- index = VarInt.decodeLong(inStream);
- onTimeIndex = base.timing == Timing.EARLY ? -1 : index;
- break;
- case TWO_INDICES:
- index = VarInt.decodeLong(inStream);
- onTimeIndex = VarInt.decodeLong(inStream);
- break;
- default:
- throw new CoderException("Unknown encoding " + (keyAndTag & 0xF0));
- }
- return new PaneInfo(base.isFirst, base.isLast, base.timing, index, onTimeIndex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
deleted file mode 100644
index bea0285..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * A {@link WindowFn} that places each value into exactly one window based on its timestamp and
- * never merges windows.
- *
- * @param <T> type of elements being windowed
- * @param <W> window type
- */
-public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
- extends NonMergingWindowFn<T, W> {
- /**
- * Returns the single window to which elements with this timestamp belong.
- */
- public abstract W assignWindow(Instant timestamp);
-
- @Override
- public final Collection<W> assignWindows(AssignContext c) {
- return Arrays.asList(assignWindow(c.timestamp()));
- }
-
- @Override
- public W getSideInputWindow(final BoundedWindow window) {
- if (window instanceof GlobalWindow) {
- throw new IllegalArgumentException(
- "Attempted to get side input window for GlobalWindow from non-global WindowFn");
- }
- return assignWindow(window.maxTimestamp());
- }
-
- @Override
- public boolean assignsToSingleWindow() {
- return true;
- }
-
- @Override
- public Instant getOutputTime(Instant inputTimestamp, W window) {
- return inputTimestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
deleted file mode 100644
index e77e2a1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Repeat a trigger, either until some condition is met or forever.
- *
- * <p>For example, to fire after the end of the window, and every time late data arrives:
- * <pre> {@code
- * Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
- * } </pre>
- *
- * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
- * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- * {@code Trigger}
- */
-public class Repeatedly<W extends BoundedWindow> extends Trigger<W> {
-
- private static final int REPEATED = 0;
-
- /**
- * Create a composite trigger that repeatedly executes the trigger {@code toRepeat}, firing each
- * time it fires and ignoring any indications to finish.
- *
- * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
- *
- * @param repeated the trigger to execute repeatedly.
- */
- public static <W extends BoundedWindow> Repeatedly<W> forever(Trigger<W> repeated) {
- return new Repeatedly<W>(repeated);
- }
-
- private Repeatedly(Trigger<W> repeated) {
- super(Arrays.asList(repeated));
- }
-
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- getRepeated(c).invokeOnElement(c);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- getRepeated(c).invokeOnMerge(c);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(W window) {
- // This trigger fires once the repeated trigger fires.
- return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
- }
-
- @Override
- public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
- return new Repeatedly<W>(continuationTriggers.get(REPEATED));
- }
-
- @Override
- public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
- return getRepeated(context).invokeShouldFire(context);
- }
-
- @Override
- public void onFire(TriggerContext context) throws Exception {
- getRepeated(context).invokeOnFire(context);
-
- if (context.trigger().isFinished(REPEATED)) {
- context.trigger().setFinished(false, REPEATED);
- getRepeated(context).invokeClear(context);
- }
- }
-
- private ExecutableTrigger<W> getRepeated(TriggerContext context) {
- return context.trigger().subTrigger(REPEATED);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
deleted file mode 100644
index da137c1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Duration;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Objects;
-
-/**
- * A {@link WindowFn} windowing values into sessions separated by {@link #gapDuration}-long
- * periods with no elements.
- *
- * <p>For example, in order to window data into session with at least 10 minute
- * gaps in between them:
- * <pre> {@code
- * PCollection<Integer> pc = ...;
- * PCollection<Integer> windowed_pc = pc.apply(
- * Window.<Integer>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
- * } </pre>
- */
-public class Sessions extends WindowFn<Object, IntervalWindow> {
- /**
- * Duration of the gaps between sessions.
- */
- private final Duration gapDuration;
-
- /**
- * Creates a {@code Sessions} {@link WindowFn} with the specified gap duration.
- */
- public static Sessions withGapDuration(Duration gapDuration) {
- return new Sessions(gapDuration);
- }
-
- /**
- * Creates a {@code Sessions} {@link WindowFn} with the specified gap duration.
- */
- private Sessions(Duration gapDuration) {
- this.gapDuration = gapDuration;
- }
-
- @Override
- public Collection<IntervalWindow> assignWindows(AssignContext c) {
- // Assign each element into a window from its timestamp until gapDuration in the
- // future. Overlapping windows (representing elements within gapDuration of
- // each other) will be merged.
- return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
- }
-
- @Override
- public void mergeWindows(MergeContext c) throws Exception {
- MergeOverlappingIntervalWindows.mergeWindows(c);
- }
-
- @Override
- public Coder<IntervalWindow> windowCoder() {
- return IntervalWindow.getCoder();
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- return other instanceof Sessions;
- }
-
- @Override
- public IntervalWindow getSideInputWindow(BoundedWindow window) {
- throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
- }
-
- @Experimental(Kind.OUTPUT_TIME)
- @Override
- public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
- return OutputTimeFns.outputAtEarliestInputTimestamp();
- }
-
- public Duration getGapDuration() {
- return gapDuration;
- }
-
- @Override
- public boolean equals(Object object) {
- if (!(object instanceof Sessions)) {
- return false;
- }
- Sessions other = (Sessions) object;
- return getGapDuration().equals(other.getGapDuration());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(gapDuration);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
deleted file mode 100644
index b0066d6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A {@link WindowFn} that windows values into possibly overlapping fixed-size
- * timestamp-based windows.
- *
- * <p>For example, in order to window data into 10 minute windows that
- * update every minute:
- * <pre> {@code
- * PCollection<Integer> items = ...;
- * PCollection<Integer> windowedItems = items.apply(
- * Window.<Integer>into(SlidingWindows.of(Duration.standardMinutes(10))));
- * } </pre>
- */
-public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
-
- /**
- * Amount of time between generated windows.
- */
- private final Duration period;
-
- /**
- * Size of the generated windows.
- */
- private final Duration size;
-
- /**
- * Offset of the generated windows.
- * Windows start at time N * start + offset, where 0 is the epoch.
- */
- private final Duration offset;
-
- /**
- * Assigns timestamps into half-open intervals of the form
- * [N * period, N * period + size), where 0 is the epoch.
- *
- * <p>If {@link SlidingWindows#every} is not called, the period defaults
- * to the largest time unit smaller than the given duration. For example,
- * specifying a size of 5 seconds will result in a default period of 1 second.
- */
- public static SlidingWindows of(Duration size) {
- return new SlidingWindows(getDefaultPeriod(size), size, Duration.ZERO);
- }
-
- /**
- * Returns a new {@code SlidingWindows} with the original size, that assigns
- * timestamps into half-open intervals of the form
- * [N * period, N * period + size), where 0 is the epoch.
- */
- public SlidingWindows every(Duration period) {
- return new SlidingWindows(period, size, offset);
- }
-
- /**
- * Assigns timestamps into half-open intervals of the form
- * [N * period + offset, N * period + offset + size).
- *
- * @throws IllegalArgumentException if offset is not in [0, period)
- */
- public SlidingWindows withOffset(Duration offset) {
- return new SlidingWindows(period, size, offset);
- }
-
- private SlidingWindows(Duration period, Duration size, Duration offset) {
- if (offset.isShorterThan(Duration.ZERO)
- || !offset.isShorterThan(period)
- || !size.isLongerThan(Duration.ZERO)) {
- throw new IllegalArgumentException(
- "SlidingWindows WindowingStrategies must have 0 <= offset < period and 0 < size");
- }
- this.period = period;
- this.size = size;
- this.offset = offset;
- }
-
- @Override
- public Coder<IntervalWindow> windowCoder() {
- return IntervalWindow.getCoder();
- }
-
- @Override
- public Collection<IntervalWindow> assignWindows(AssignContext c) {
- List<IntervalWindow> windows =
- new ArrayList<>((int) (size.getMillis() / period.getMillis()));
- Instant timestamp = c.timestamp();
- long lastStart = lastStartFor(timestamp);
- for (long start = lastStart;
- start > timestamp.minus(size).getMillis();
- start -= period.getMillis()) {
- windows.add(new IntervalWindow(new Instant(start), size));
- }
- return windows;
- }
-
- /**
- * Return the earliest window that contains the end of the main-input window.
- */
- @Override
- public IntervalWindow getSideInputWindow(final BoundedWindow window) {
- if (window instanceof GlobalWindow) {
- throw new IllegalArgumentException(
- "Attempted to get side input window for GlobalWindow from non-global WindowFn");
- }
- long lastStart = lastStartFor(window.maxTimestamp().minus(size));
- return new IntervalWindow(new Instant(lastStart + period.getMillis()), size);
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- return equals(other);
- }
-
- /**
- * Return the last start of a sliding window that contains the timestamp.
- */
- private long lastStartFor(Instant timestamp) {
- return timestamp.getMillis()
- - timestamp.plus(period).minus(offset).getMillis() % period.getMillis();
- }
-
- static Duration getDefaultPeriod(Duration size) {
- if (size.isLongerThan(Duration.standardHours(1))) {
- return Duration.standardHours(1);
- }
- if (size.isLongerThan(Duration.standardMinutes(1))) {
- return Duration.standardMinutes(1);
- }
- if (size.isLongerThan(Duration.standardSeconds(1))) {
- return Duration.standardSeconds(1);
- }
- return Duration.millis(1);
- }
-
- public Duration getPeriod() {
- return period;
- }
-
- public Duration getSize() {
- return size;
- }
-
- public Duration getOffset() {
- return offset;
- }
-
- /**
- * Ensures that later sliding windows have an output time that is past the end of earlier windows.
- *
- * <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
- * Otherwise, we pick the earliest time that doesn't overlap with earlier windows.
- */
- @Experimental(Kind.OUTPUT_TIME)
- @Override
- public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
- return new OutputTimeFn.Defaults<BoundedWindow>() {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- Instant startOfLastSegment = window.maxTimestamp().minus(period);
- return startOfLastSegment.isBefore(inputTimestamp)
- ? inputTimestamp
- : startOfLastSegment.plus(1);
- }
-
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- };
- }
-
- @Override
- public boolean equals(Object object) {
- if (!(object instanceof SlidingWindows)) {
- return false;
- }
- SlidingWindows other = (SlidingWindows) object;
- return getOffset().equals(other.getOffset())
- && getSize().equals(other.getSize())
- && getPeriod().equals(other.getPeriod());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(size, offset, period);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
deleted file mode 100644
index 4471563..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.common.base.Joiner;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * {@code Trigger}s control when the elements for a specific key and window are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
- * {@code Window}s contents should be output.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
- *
- * <p>The elements that are assigned to a window since the last time it was fired (or since the
- * window was created) are placed into the current window pane. Triggers are evaluated against the
- * elements as they are added. When the root trigger fires, the elements in the current pane will be
- * output. When the root trigger finishes (indicating it will never fire again), the window is
- * closed and any new elements assigned to that window are discarded.
- *
- * <p>Several predefined {@code Trigger}s are provided:
- * <ul>
- * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
- * either the end of the window or the arrival of the first element in a pane.
- * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
- * (typically since the first element in a pane).
- * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
- * the number of elements that have been assigned to the current pane.
- * </ul>
- *
- * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
- * argument finishes it gets reset and starts over. Can be combined with
- * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
- * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
- * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
- * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
- * fires. An {@link AfterFirst} trigger finishes after it fires once.
- * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
- * have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
- * </ul>
- *
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
- * of the following states:
- * <ul>
- * <li> Never Existed - before the trigger has started executing, there is no state associated
- * with it anywhere in the system. A trigger moves to the executing state as soon as it
- * processes in the current pane.
- * <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
- * it may persist book-keeping information to persisted state, set timers, etc.
- * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
- * system remembers only that it is finished. Entering this state causes us to discard any
- * elements in the buffer for that window, as well.
- * </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be recreated
- * between invocations of the callbacks. All important values should be persisted using
- * state before the callback returns.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- * {@code Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class Trigger<W extends BoundedWindow> implements Serializable, TriggerBuilder<W> {
-
- /**
- * Interface for accessing information about the trigger being executed and other triggers in the
- * same tree.
- */
- public interface TriggerInfo<W extends BoundedWindow> {
-
- /**
- * Returns true if the windowing strategy of the current {@code PCollection} is a merging
- * WindowFn. If true, the trigger execution needs to keep enough information to support the
- * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
- * never be called.
- */
- boolean isMerging();
-
- /**
- * Access the executable versions of the sub-triggers of the current trigger.
- */
- Iterable<ExecutableTrigger<W>> subTriggers();
-
- /**
- * Access the executable version of the specified sub-trigger.
- */
- ExecutableTrigger<W> subTrigger(int subtriggerIndex);
-
- /**
- * Returns true if the current trigger is marked finished.
- */
- boolean isFinished();
-
- /**
- * Return true if the given subtrigger is marked finished.
- */
- boolean isFinished(int subtriggerIndex);
-
- /**
- * Returns true if all the sub-triggers of the current trigger are marked finished.
- */
- boolean areAllSubtriggersFinished();
-
- /**
- * Returns an iterable over the unfinished sub-triggers of the current trigger.
- */
- Iterable<ExecutableTrigger<W>> unfinishedSubTriggers();
-
- /**
- * Returns the first unfinished sub-trigger.
- */
- ExecutableTrigger<W> firstUnfinishedSubTrigger();
-
- /**
- * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
- * finished bits.
- */
- void resetTree() throws Exception;
-
- /**
- * Sets the finished bit for the current trigger.
- */
- void setFinished(boolean finished);
-
- /**
- * Sets the finished bit for the given sub-trigger.
- */
- void setFinished(boolean finished, int subTriggerIndex);
- }
-
- /**
- * Interact with properties of the trigger being executed, with extensions to deal with the
- * merging windows.
- */
- public interface MergingTriggerInfo<W extends BoundedWindow> extends TriggerInfo<W> {
-
- /** Return true if the trigger is finished in any window being merged. */
- public abstract boolean finishedInAnyMergingWindow();
-
- /** Return true if the trigger is finished in all windows being merged. */
- public abstract boolean finishedInAllMergingWindows();
-
- /** Return the merging windows in which the trigger is finished. */
- public abstract Iterable<W> getFinishedMergingWindows();
- }
-
- /**
- * Information accessible to all operational hooks in this {@code Trigger}.
- *
- * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
- * extended with additional information in other methods.
- */
- public abstract class TriggerContext {
-
- /** Returns the interface for accessing trigger info. */
- public abstract TriggerInfo<W> trigger();
-
- /** Returns the interface for accessing persistent state. */
- public abstract StateAccessor<?> state();
-
- /** The window that the current context is executing in. */
- public abstract W window();
-
- /** Create a sub-context for the given sub-trigger. */
- public abstract TriggerContext forTrigger(ExecutableTrigger<W> trigger);
-
- /**
- * Removes the timer set in this trigger context for the given {@link Instant}
- * and {@link TimeDomain}.
- */
- public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
- /** The current processing time. */
- public abstract Instant currentProcessingTime();
-
- /** The current synchronized upstream processing time or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentSynchronizedProcessingTime();
-
- /** The current event time for the input or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentEventTime();
- }
-
- /**
- * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
- * operational hook.
- */
- public abstract class OnElementContext extends TriggerContext {
- /** The event timestamp of the element currently being processed. */
- public abstract Instant eventTimestamp();
-
- /**
- * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
- * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
- *
- * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
- * timer firings for a window will be received, but the implementation should choose to ignore
- * those that are not applicable.
- *
- * @param timestamp the time at which the trigger should be re-evaluated
- * @param domain the domain that the {@code timestamp} applies to
- */
- public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
- /** Create an {@code OnElementContext} for executing the given trigger. */
- @Override
- public abstract OnElementContext forTrigger(ExecutableTrigger<W> trigger);
- }
-
- /**
- * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
- * operational hook.
- */
- public abstract class OnMergeContext extends TriggerContext {
- /**
- * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
- * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
- *
- * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
- * timer firings for a window will be received, but the implementation should choose to ignore
- * those that are not applicable.
- *
- * @param timestamp the time at which the trigger should be re-evaluated
- * @param domain the domain that the {@code timestamp} applies to
- */
- public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
- /** Create an {@code OnMergeContext} for executing the given trigger. */
- @Override
- public abstract OnMergeContext forTrigger(ExecutableTrigger<W> trigger);
-
- @Override
- public abstract MergingStateAccessor<?, W> state();
-
- @Override
- public abstract MergingTriggerInfo<W> trigger();
- }
-
- @Nullable
- protected final List<Trigger<W>> subTriggers;
-
- protected Trigger(@Nullable List<Trigger<W>> subTriggers) {
- this.subTriggers = subTriggers;
- }
-
-
- /**
- * Called immediately after an element is first incorporated into a window.
- */
- public abstract void onElement(OnElementContext c) throws Exception;
-
- /**
- * Called immediately after windows have been merged.
- *
- * <p>Leaf triggers should update their state by inspecting their status and any state
- * in the merging windows. Composite triggers should update their state by calling
- * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
- *
- * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
- * it is the responsibility of the trigger itself to record this fact. It is forbidden for
- * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
- * elements that led to it being ready to fire.
- *
- * <p>The implementation does not need to clear out any state associated with the old windows.
- */
- public abstract void onMerge(OnMergeContext c) throws Exception;
-
- /**
- * Returns {@code true} if the current state of the trigger indicates that its condition
- * is satisfied and it is ready to fire.
- */
- public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
- /**
- * Adjusts the state of the trigger to be ready for the next pane. For example, a
- * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
- *
- * <p>If the trigger is finished, it is the responsibility of the trigger itself to
- * record that fact via the {@code context}.
- */
- public abstract void onFire(TriggerContext context) throws Exception;
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onElement} call.
- */
- public void prefetchOnElement(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger<W> trigger : subTriggers) {
- trigger.prefetchOnElement(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onMerge} call.
- */
- public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
- if (subTriggers != null) {
- for (Trigger<W> trigger : subTriggers) {
- trigger.prefetchOnMerge(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #shouldFire} call.
- */
- public void prefetchShouldFire(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger<W> trigger : subTriggers) {
- trigger.prefetchShouldFire(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onFire} call.
- */
- public void prefetchOnFire(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger<W> trigger : subTriggers) {
- trigger.prefetchOnFire(state);
- }
- }
- }
-
- /**
- * Clear any state associated with this trigger in the given window.
- *
- * <p>This is called after a trigger has indicated it will never fire again. The trigger system
- * keeps enough information to know that the trigger is finished, so this trigger should clear all
- * of its state.
- */
- public void clear(TriggerContext c) throws Exception {
- if (subTriggers != null) {
- for (ExecutableTrigger<W> trigger : c.trigger().subTriggers()) {
- trigger.invokeClear(c);
- }
- }
- }
-
- public Iterable<Trigger<W>> subTriggers() {
- return subTriggers;
- }
-
- /**
- * Return a trigger to use after a {@code GroupByKey} to preserve the
- * intention of this trigger. Specifically, triggers that are time based
- * and intended to provide speculative results should continue providing
- * speculative results. Triggers that fire once (or multiple times) should
- * continue firing once (or multiple times).
- */
- public Trigger<W> getContinuationTrigger() {
- if (subTriggers == null) {
- return getContinuationTrigger(null);
- }
-
- List<Trigger<W>> subTriggerContinuations = new ArrayList<>();
- for (Trigger<W> subTrigger : subTriggers) {
- subTriggerContinuations.add(subTrigger.getContinuationTrigger());
- }
- return getContinuationTrigger(subTriggerContinuations);
- }
-
- /**
- * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
- * is provided the continuation trigger of each of the sub-triggers.
- */
- protected abstract Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers);
-
- /**
- * Returns a bound in watermark time by which this trigger would have fired at least once
- * for a given window had there been input data. This is a static property of a trigger
- * that does not depend on its state.
- *
- * <p>For triggers that do not fire based on the watermark advancing, returns
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>This estimate is used to determine that there are no elements in a side-input window, which
- * causes the default value to be used instead.
- */
- public abstract Instant getWatermarkThatGuaranteesFiring(W window);
-
- /**
- * Returns whether this performs the same triggering as the given {@code Trigger}.
- */
- public boolean isCompatible(Trigger<?> other) {
- if (!getClass().equals(other.getClass())) {
- return false;
- }
-
- if (subTriggers == null) {
- return other.subTriggers == null;
- } else if (other.subTriggers == null) {
- return false;
- } else if (subTriggers.size() != other.subTriggers.size()) {
- return false;
- }
-
- for (int i = 0; i < subTriggers.size(); i++) {
- if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- String simpleName = getClass().getSimpleName();
- if (getClass().getEnclosingClass() != null) {
- simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName;
- }
- if (subTriggers == null || subTriggers.size() == 0) {
- return simpleName;
- } else {
- return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof Trigger)) {
- return false;
- }
- @SuppressWarnings("unchecked")
- Trigger<W> that = (Trigger<W>) obj;
- return Objects.equals(getClass(), that.getClass())
- && Objects.equals(subTriggers, that.subTriggers);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), subTriggers);
- }
-
- /**
- * Specify an ending condition for this trigger. If the {@code until} fires then the combination
- * fires.
- *
- * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
- * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
- * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
- * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
- * has seen are necessarily in the current pane.
- *
- * <p>For example the final firing of the following trigger may only have 1 element:
- * <pre> {@code
- * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
- * .orFinally(AfterPane.elementCountAtLeast(5))
- * } </pre>
- *
- * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
- * as {@code AfterFirst.of(t1, t2)}.
- */
- public Trigger<W> orFinally(OnceTrigger<W> until) {
- return new OrFinallyTrigger<W>(this, until);
- }
-
- @Override
- public Trigger<W> buildTrigger() {
- return this;
- }
-
- /**
- * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
- * than the general {@link Trigger} class to indicate that behavior.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- * {@code AtMostOnceTrigger}
- */
- public abstract static class OnceTrigger<W extends BoundedWindow> extends Trigger<W> {
- protected OnceTrigger(List<Trigger<W>> subTriggers) {
- super(subTriggers);
- }
-
- @Override
- public final OnceTrigger<W> getContinuationTrigger() {
- Trigger<W> continuation = super.getContinuationTrigger();
- if (!(continuation instanceof OnceTrigger)) {
- throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger");
- }
- return (OnceTrigger<W>) continuation;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final void onFire(TriggerContext context) throws Exception {
- onOnlyFiring(context);
- context.trigger().setFinished(true);
- }
-
- /**
- * Called exactly once by {@link #onFire} when the trigger is fired. By default,
- * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
- */
- protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
deleted file mode 100644
index cc817ba..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-/**
- * Anything that can be used to create an instance of a {@code Trigger} implements this interface.
- *
- * <p>This includes {@code Trigger}s (which can return themselves) and any "enhanced" syntax for
- * constructing a trigger.
- *
- * @param <W> The type of windows the built trigger will operate on.
- */
-public interface TriggerBuilder<W extends BoundedWindow> {
- /** Return the {@code Trigger} built by this builder. */
- Trigger<W> buildTrigger();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
deleted file mode 100644
index 6793e76..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-
-import javax.annotation.Nullable;
-
-/**
- * {@code Window} logically divides up or groups the elements of a
- * {@link PCollection} into finite windows according to a {@link WindowFn}.
- * The output of {@code Window} contains the same elements as input, but they
- * have been logically assigned to windows. The next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey GroupByKeys},
- * including one within composite transforms, will group by the combination of
- * keys and windows.
-
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}
- * for more information about how grouping with windows works.
- *
- * <h2> Windowing </h2>
- *
- * <p>Windowing a {@code PCollection} divides the elements into windows based
- * on the associated event time for each element. This is especially useful
- * for {@code PCollection}s with unbounded size, since it allows operating on
- * a sub-group of the elements placed into a related window. For {@code PCollection}s
- * with a bounded size (aka. conventional batch mode), by default, all data is
- * implicitly in a single window, unless {@code Window} is applied.
- *
- * <p>For example, a simple form of windowing divides up the data into
- * fixed-width time intervals, using {@link FixedWindows}.
- * The following example demonstrates how to use {@code Window} in a pipeline
- * that counts the number of occurrences of strings each minute:
- *
- * <pre> {@code
- * PCollection<String> items = ...;
- * PCollection<String> windowed_items = items.apply(
- * Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
- * PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
- * Count.<String>perElement());
- * } </pre>
- *
- * <p>Let (data, timestamp) denote a data element along with its timestamp.
- * Then, if the input to this pipeline consists of
- * {("foo", 15s), ("bar", 30s), ("foo", 45s), ("foo", 1m30s)},
- * the output will be
- * {(KV("foo", 2), 1m), (KV("bar", 1), 1m), (KV("foo", 1), 2m)}
- *
- * <p>Several predefined {@link WindowFn}s are provided:
- * <ul>
- * <li> {@link FixedWindows} partitions the timestamps into fixed-width intervals.
- * <li> {@link SlidingWindows} places data into overlapping fixed-width intervals.
- * <li> {@link Sessions} groups data into sessions where each item in a window
- * is separated from the next by no more than a specified gap.
- * </ul>
- *
- * <p>Additionally, custom {@link WindowFn}s can be created, by creating new
- * subclasses of {@link WindowFn}.
- *
- * <h2> Triggers </h2>
- *
- * <p>{@link Window.Bound#triggering(TriggerBuilder)} allows specifying a trigger to control when
- * (in processing time) results for the given window can be produced. If unspecified, the default
- * behavior is to trigger first when the watermark passes the end of the window, and then trigger
- * again every time there is late arriving data.
- *
- * <p>Elements are added to the current window pane as they arrive. When the root trigger fires,
- * output is produced based on the elements in the current pane.
- *
- * <p>Depending on the trigger, this can be used both to output partial results
- * early during the processing of the whole window, and to deal with late
- * arriving in batches.
- *
- * <p>Continuing the earlier example, if we wanted to emit the values that were available
- * when the watermark passed the end of the window, and then output any late arriving
- * elements once-per (actual hour) hour until we have finished processing the next 24-hours of data.
- * (The use of watermark time to stop processing tends to be more robust if the data source is slow
- * for a few days, etc.)
- *
- * <pre> {@code
- * PCollection<String> items = ...;
- * PCollection<String> windowed_items = items.apply(
- * Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
- * .triggering(
- * AfterWatermark.pastEndOfWindow()
- * .withLateFirings(AfterProcessingTime
- * .pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))
- * .withAllowedLateness(Duration.standardDays(1)));
- * PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
- * Count.<String>perElement());
- * } </pre>
- *
- * <p>On the other hand, if we wanted to get early results every minute of processing
- * time (for which there were new elements in the given window) we could do the following:
- *
- * <pre> {@code
- * PCollection<String> windowed_items = items.apply(
- * Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
- * .triggering(
- * .triggering(
- * AfterWatermark.pastEndOfWindow()
- * .withEarlyFirings(AfterProcessingTime
- * .pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
- * .withAllowedLateness(Duration.ZERO));
- * } </pre>
- *
- * <p>After a {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} the trigger is set to
- * a trigger that will preserve the intent of the upstream trigger. See
- * {@link Trigger#getContinuationTrigger} for more information.
- *
- * <p>See {@link Trigger} for details on the available triggers.
- */
-public class Window {
-
- /**
- * Specifies the conditions under which a final pane will be created when a window is permanently
- * closed.
- */
- public enum ClosingBehavior {
- /**
- * Always fire the last pane. Even if there is no new data since the previous firing, an element
- * with {@link PaneInfo#isLast()} {@code true} will be produced.
- */
- FIRE_ALWAYS,
- /**
- * Only fire the last pane if there is new data since the previous firing.
- *
- * <p>This is the default behavior.
- */
- FIRE_IF_NON_EMPTY;
- }
-
- /**
- * Creates a {@code Window} {@code PTransform} with the given name.
- *
- * <p>See the discussion of Naming in
- * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more explanation.
- *
- * <p>The resulting {@code PTransform} is incomplete, and its input/output
- * type is not yet bound. Use {@link Window.Unbound#into} to specify the
- * {@link WindowFn} to use, which will also bind the input/output type of this
- * {@code PTransform}.
- */
- public static Unbound named(String name) {
- return new Unbound().named(name);
- }
-
- /**
- * Creates a {@code Window} {@code PTransform} that uses the given
- * {@link WindowFn} to window the data.
- *
- * <p>The resulting {@code PTransform}'s types have been bound, with both the
- * input and output being a {@code PCollection<T>}, inferred from the types of
- * the argument {@code WindowFn}. It is ready to be applied, or further
- * properties can be set on it first.
- */
- public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
- return new Unbound().into(fn);
- }
-
- /**
- * Sets a non-default trigger for this {@code Window} {@code PTransform}.
- * Elements that are assigned to a specific window will be output when
- * the trigger fires.
- *
- * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
- * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> triggering(TriggerBuilder<?> trigger) {
- return new Unbound().triggering(trigger);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that discards elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> discardingFiredPanes() {
- return new Unbound().discardingFiredPanes();
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that accumulates elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> accumulatingFiredPanes() {
- return new Unbound().accumulatingFiredPanes();
- }
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
- return new Unbound().withAllowedLateness(allowedLateness);
- }
-
- /**
- * An incomplete {@code Window} transform, with unbound input/output type.
- *
- * <p>Before being applied, {@link Window.Unbound#into} must be
- * invoked to specify the {@link WindowFn} to invoke, which will also
- * bind the input/output type of this {@code PTransform}.
- */
- public static class Unbound {
- String name;
-
- Unbound() {}
-
- Unbound(String name) {
- this.name = name;
- }
-
- /**
- * Returns a new {@code Window} transform that's like this
- * transform but with the specified name. Does not modify this
- * transform. The resulting transform is still incomplete.
- *
- * <p>See the discussion of Naming in
- * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more
- * explanation.
- */
- public Unbound named(String name) {
- return new Unbound(name);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that's like this
- * transform but that will use the given {@link WindowFn}, and that has
- * its input and output types bound. Does not modify this transform. The
- * resulting {@code PTransform} is sufficiently specified to be applied,
- * but more properties can still be specified.
- */
- public <T> Bound<T> into(WindowFn<? super T, ?> fn) {
- return new Bound<T>(name).into(fn);
- }
-
- /**
- * Sets a non-default trigger for this {@code Window} {@code PTransform}.
- * Elements that are assigned to a specific window will be output when
- * the trigger fires.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger}
- * has more details on the available triggers.
- *
- * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
- * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
- */
- @Experimental(Kind.TRIGGER)
- public <T> Bound<T> triggering(TriggerBuilder<?> trigger) {
- return new Bound<T>(name).triggering(trigger);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that discards elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public <T> Bound<T> discardingFiredPanes() {
- return new Bound<T>(name).discardingFiredPanes();
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that accumulates elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public <T> Bound<T> accumulatingFiredPanes() {
- return new Bound<T>(name).accumulatingFiredPanes();
- }
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- *
- * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
- * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
- */
- @Experimental(Kind.TRIGGER)
- public <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
- return new Bound<T>(name).withAllowedLateness(allowedLateness);
- }
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- */
- @Experimental(Kind.TRIGGER)
- public <T> Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
- return new Bound<T>(name).withAllowedLateness(allowedLateness, behavior);
- }
- }
-
- /**
- * A {@code PTransform} that windows the elements of a {@code PCollection<T>},
- * into finite windows according to a user-specified {@code WindowFn}.
- *
- * @param <T> The type of elements this {@code Window} is applied to
- */
- public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
- @Nullable private final WindowFn<? super T, ?> windowFn;
- @Nullable private final Trigger<?> trigger;
- @Nullable private final AccumulationMode mode;
- @Nullable private final Duration allowedLateness;
- @Nullable private final ClosingBehavior closingBehavior;
- @Nullable private final OutputTimeFn<?> outputTimeFn;
-
- private Bound(String name,
- @Nullable WindowFn<? super T, ?> windowFn, @Nullable Trigger<?> trigger,
- @Nullable AccumulationMode mode, @Nullable Duration allowedLateness,
- ClosingBehavior behavior, @Nullable OutputTimeFn<?> outputTimeFn) {
- super(name);
- this.windowFn = windowFn;
- this.trigger = trigger;
- this.mode = mode;
- this.allowedLateness = allowedLateness;
- this.closingBehavior = behavior;
- this.outputTimeFn = outputTimeFn;
- }
-
- private Bound(String name) {
- this(name, null, null, null, null, null, null);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that's like this
- * transform but that will use the given {@link WindowFn}, and that has
- * its input and output types bound. Does not modify this transform. The
- * resulting {@code PTransform} is sufficiently specified to be applied,
- * but more properties can still be specified.
- */
- private Bound<T> into(WindowFn<? super T, ?> windowFn) {
- try {
- windowFn.windowCoder().verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalArgumentException("Window coders must be deterministic.", e);
- }
-
- return new Bound<>(
- name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that's like this
- * {@code PTransform} but with the specified name. Does not
- * modify this {@code PTransform}.
- *
- * <p>See the discussion of Naming in
- * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more
- * explanation.
- */
- public Bound<T> named(String name) {
- return new Bound<>(
- name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
- }
-
- /**
- * Sets a non-default trigger for this {@code Window} {@code PTransform}.
- * Elements that are assigned to a specific window will be output when
- * the trigger fires.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger}
- * has more details on the available triggers.
- *
- * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
- * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> triggering(TriggerBuilder<?> trigger) {
- return new Bound<T>(
- name,
- windowFn,
- trigger.buildTrigger(),
- mode,
- allowedLateness,
- closingBehavior,
- outputTimeFn);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that discards elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> discardingFiredPanes() {
- return new Bound<T>(
- name,
- windowFn,
- trigger,
- AccumulationMode.DISCARDING_FIRED_PANES,
- allowedLateness,
- closingBehavior,
- outputTimeFn);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that accumulates elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> accumulatingFiredPanes() {
- return new Bound<T>(
- name,
- windowFn,
- trigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES,
- allowedLateness,
- closingBehavior,
- outputTimeFn);
- }
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- *
- * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
- * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> withAllowedLateness(Duration allowedLateness) {
- return new Bound<T>(
- name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
- }
-
- /**
- * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
- * the output timestamp of values output from a {@link GroupByKey} operation.
- */
- @Experimental(Kind.OUTPUT_TIME)
- public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
- return new Bound<T>(
- name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
- }
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
- return new Bound<T>(name, windowFn, trigger, mode, allowedLateness, behavior, outputTimeFn);
- }
-
- /**
- * Get the output strategy of this {@link Window.Bound Window PTransform}. For internal use
- * only.
- */
- // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
- // casting between wildcards
- public WindowingStrategy<?, ?> getOutputStrategyInternal(
- WindowingStrategy<?, ?> inputStrategy) {
- WindowingStrategy<?, ?> result = inputStrategy;
- if (windowFn != null) {
- result = result.withWindowFn(windowFn);
- }
- if (trigger != null) {
- result = result.withTrigger(trigger);
- }
- if (mode != null) {
- result = result.withMode(mode);
- }
- if (allowedLateness != null) {
- result = result.withAllowedLateness(allowedLateness);
- }
- if (closingBehavior != null) {
- result = result.withClosingBehavior(closingBehavior);
- }
- if (outputTimeFn != null) {
- result = result.withOutputTimeFn(outputTimeFn);
- }
- return result;
- }
-
- /**
- * Get the {@link WindowFn} of this {@link Window.Bound Window PTransform}.
- */
- public WindowFn<? super T, ?> getWindowFn() {
- return windowFn;
- }
-
- @Override
- public void validate(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- getOutputStrategyInternal(input.getWindowingStrategy());
-
- // Make sure that the windowing strategy is complete & valid.
- if (outputStrategy.isTriggerSpecified()
- && !(outputStrategy.getTrigger().getSpec() instanceof DefaultTrigger)) {
- if (!(outputStrategy.getWindowFn() instanceof GlobalWindows)
- && !outputStrategy.isAllowedLatenessSpecified()) {
- throw new IllegalArgumentException("Except when using GlobalWindows,"
- + " calling .triggering() to specify a trigger requires that the allowed lateness be"
- + " specified using .withAllowedLateness() to set the upper bound on how late data"
- + " can arrive before being dropped. See Javadoc for more details.");
- }
-
- if (!outputStrategy.isModeSpecified()) {
- throw new IllegalArgumentException(
- "Calling .triggering() to specify a trigger requires that the accumulation mode be"
- + " specified using .discardingFiredPanes() or .accumulatingFiredPanes()."
- + " See Javadoc for more details.");
- }
- }
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- getOutputStrategyInternal(input.getWindowingStrategy());
- PCollection<T> output;
- if (windowFn != null) {
- // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- output = assignWindows(input, windowFn);
- } else {
- // If the windowFn didn't change, we just run a pass-through transform and then set the
- // new windowing strategy.
- output = input.apply(Window.<T>identity());
- }
- return output.setWindowingStrategyInternal(outputStrategy);
- }
-
- private <T, W extends BoundedWindow> PCollection<T> assignWindows(
- PCollection<T> input, WindowFn<? super T, W> windowFn) {
- return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<T, W>(windowFn)));
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
- protected String getKindString() {
- return "Window.Into()";
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static <T> PTransform<PCollection<? extends T>, PCollection<T>> identity() {
- return ParDo.named("Identity").of(new DoFn<T, T>() {
- @Override public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- });
- }
-
- /**
- * Creates a {@code Window} {@code PTransform} that does not change assigned
- * windows, but will cause windows to be merged again as part of the next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
- */
- public static <T> Remerge<T> remerge() {
- return new Remerge<T>();
- }
-
- /**
- * {@code PTransform} that does not change assigned windows, but will cause
- * windows to be merged again as part of the next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
- */
- public static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
- input.getWindowingStrategy());
-
- return input.apply(Window.<T>identity())
- .setWindowingStrategyInternal(outputWindowingStrategy);
- }
-
- private <W extends BoundedWindow> WindowingStrategy<?, W> getOutputWindowing(
- WindowingStrategy<?, W> inputStrategy) {
- if (inputStrategy.getWindowFn() instanceof InvalidWindows) {
- @SuppressWarnings("unchecked")
- InvalidWindows<W> invalidWindows = (InvalidWindows<W>) inputStrategy.getWindowFn();
- return inputStrategy.withWindowFn(invalidWindows.getOriginalWindowFn());
- } else {
- return inputStrategy;
- }
- }
- }
-}