You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:24 UTC
[10/12] incubator-beam git commit: Move some easy stuff into
runners/core-java
Move some easy stuff into runners/core-java
This change moves a set of classes with no dependents,
leaving them in the same Java packages for now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0fef8e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0fef8e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0fef8e63
Branch: refs/heads/master
Commit: 0fef8e6349216374ef60ef1d3356bdbdcc6f32ee
Parents: efaad32
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 20 11:54:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 21 14:09:49 2016 -0700
----------------------------------------------------------------------
runners/core-java/pom.xml | 30 +
.../org/apache/beam/sdk/util/AssignWindows.java | 46 +
.../apache/beam/sdk/util/AssignWindowsDoFn.java | 75 +
.../beam/sdk/util/BatchTimerInternals.java | 140 ++
.../org/apache/beam/sdk/util/DoFnRunner.java | 62 +
.../apache/beam/sdk/util/DoFnRunnerBase.java | 558 +++++++
.../org/apache/beam/sdk/util/DoFnRunners.java | 144 ++
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 59 +
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 100 ++
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 212 +++
.../sdk/util/LateDataDroppingDoFnRunner.java | 147 ++
.../org/apache/beam/sdk/util/NonEmptyPanes.java | 150 ++
.../apache/beam/sdk/util/PaneInfoTracker.java | 154 ++
.../sdk/util/PushbackSideInputDoFnRunner.java | 115 ++
.../java/org/apache/beam/sdk/util/ReduceFn.java | 130 ++
.../beam/sdk/util/ReduceFnContextFactory.java | 497 ++++++
.../apache/beam/sdk/util/ReduceFnRunner.java | 985 ++++++++++++
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 56 +
.../apache/beam/sdk/util/SystemReduceFn.java | 135 ++
.../org/apache/beam/sdk/util/TriggerRunner.java | 234 +++
.../org/apache/beam/sdk/util/WatermarkHold.java | 536 +++++++
.../beam/sdk/util/BatchTimerInternalsTest.java | 118 ++
.../sdk/util/GroupAlsoByWindowsProperties.java | 619 ++++++++
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 106 ++
.../util/LateDataDroppingDoFnRunnerTest.java | 117 ++
.../util/PushbackSideInputDoFnRunnerTest.java | 234 +++
.../beam/sdk/util/ReduceFnRunnerTest.java | 1448 ++++++++++++++++++
.../apache/beam/sdk/util/ReduceFnTester.java | 784 ++++++++++
.../beam/sdk/util/SimpleDoFnRunnerTest.java | 86 ++
.../beam/runners/direct/DirectGroupByKey.java | 2 +-
.../direct/GroupByKeyEvaluatorFactoryTest.java | 2 +-
.../GroupByKeyOnlyEvaluatorFactoryTest.java | 2 +-
.../org/apache/beam/sdk/util/AssignWindows.java | 46 -
.../apache/beam/sdk/util/AssignWindowsDoFn.java | 75 -
.../beam/sdk/util/BatchTimerInternals.java | 140 --
.../org/apache/beam/sdk/util/DoFnRunner.java | 62 -
.../apache/beam/sdk/util/DoFnRunnerBase.java | 558 -------
.../org/apache/beam/sdk/util/DoFnRunners.java | 144 --
.../apache/beam/sdk/util/GatherAllPanes.java | 1 -
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 59 -
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 100 --
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 247 ---
.../sdk/util/LateDataDroppingDoFnRunner.java | 147 --
.../org/apache/beam/sdk/util/NonEmptyPanes.java | 150 --
.../apache/beam/sdk/util/PaneInfoTracker.java | 154 --
.../sdk/util/PushbackSideInputDoFnRunner.java | 115 --
.../java/org/apache/beam/sdk/util/ReduceFn.java | 130 --
.../beam/sdk/util/ReduceFnContextFactory.java | 497 ------
.../apache/beam/sdk/util/ReduceFnRunner.java | 985 ------------
.../sdk/util/ReifyTimestampsAndWindows.java | 63 +
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 56 -
.../apache/beam/sdk/util/SystemReduceFn.java | 135 --
.../org/apache/beam/sdk/util/TriggerRunner.java | 234 ---
.../org/apache/beam/sdk/util/WatermarkHold.java | 536 -------
.../beam/sdk/util/BatchTimerInternalsTest.java | 118 --
.../sdk/util/GroupAlsoByWindowsProperties.java | 619 --------
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 106 --
.../util/LateDataDroppingDoFnRunnerTest.java | 117 --
.../util/PushbackSideInputDoFnRunnerTest.java | 234 ---
.../beam/sdk/util/ReduceFnRunnerTest.java | 1448 ------------------
.../apache/beam/sdk/util/ReduceFnTester.java | 784 ----------
.../beam/sdk/util/SimpleDoFnRunnerTest.java | 86 --
62 files changed, 8143 insertions(+), 8086 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 8ede60b..1587a1a 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -197,7 +197,31 @@
<!-- build dependencies -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
<!-- test dependencies -->
+
+ <!-- Utilities such as WindowMatchers -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
@@ -205,6 +229,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
new file mode 100644
index 0000000..af28052
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private WindowFn<? super T, W> fn;
+
+ public AssignWindows(WindowFn<? super T, W> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
new file mode 100644
index 0000000..caec40e
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * {@link DoFn} that tags elements of a PCollection with windows, according
+ * to the provided {@link WindowFn}.
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+@SystemDoFnInternal
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
+ private WindowFn<? super T, W> fn;
+
+ public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
+ this.fn =
+ checkNotNull(
+ fn,
+ "%s provided to %s cannot be null",
+ WindowFn.class.getSimpleName(),
+ AssignWindowsDoFn.class.getSimpleName());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(final ProcessContext c) throws Exception {
+ Collection<W> windows =
+ ((WindowFn<T, W>) fn).assignWindows(
+ ((WindowFn<T, W>) fn).new AssignContext() {
+ @Override
+ public T element() {
+ return c.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return c.timestamp();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return c.windowingInternals().windows();
+ }
+ });
+
+ c.windowingInternals()
+ .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
new file mode 100644
index 0000000..d0c0b2f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * TimerInternals that uses priority queues to manage the timers that are ready to fire.
+ */
+public class BatchTimerInternals implements TimerInternals {
+ /** Set of timers that are scheduled used for deduplicating timers. */
+ private Set<TimerData> existingTimers = new HashSet<>();
+
+ // Keep these queues separate so we can advance over them separately.
+ private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+ private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+ private Instant inputWatermarkTime;
+ private Instant processingTime;
+
+ private PriorityQueue<TimerData> queue(TimeDomain domain) {
+ return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
+ }
+
+ public BatchTimerInternals(Instant processingTime) {
+ this.processingTime = processingTime;
+ this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public void setTimer(TimerData timer) {
+ if (existingTimers.add(timer)) {
+ queue(timer.getDomain()).add(timer);
+ }
+ }
+
+ @Override
+ public void deleteTimer(TimerData timer) {
+ existingTimers.remove(timer);
+ queue(timer.getDomain()).remove(timer);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return processingTime;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
+ * is already complete.
+ */
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return inputWatermarkTime;
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ // The output watermark is always undefined in batch mode.
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("watermarkTimers", watermarkTimers)
+ .add("processingTimers", processingTimers)
+ .toString();
+ }
+
+ public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
+ throws Exception {
+ Preconditions.checkState(!newInputWatermark.isBefore(inputWatermarkTime),
+ "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
+ newInputWatermark);
+ inputWatermarkTime = newInputWatermark;
+ advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
+ }
+
+ public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
+ throws Exception {
+ Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
+ "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
+ processingTime = newProcessingTime;
+ advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
+ }
+
+ private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
+ throws Exception {
+ PriorityQueue<TimerData> timers = queue(domain);
+ boolean shouldFire = false;
+
+ do {
+ TimerData timer = timers.peek();
+ // Timers fire if the new time is ahead of the timer
+ shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
+ if (shouldFire) {
+ // Remove before firing, so that if the trigger adds another identical
+ // timer we don't remove it.
+ timers.remove();
+ runner.onTimer(timer);
+ }
+ } while (shouldFire);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
new file mode 100644
index 0000000..4ec8920
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An wrapper interface that represents the execution of a {@link DoFn}.
+ */
+public interface DoFnRunner<InputT, OutputT> {
+ /**
+ * Prepares and calls {@link DoFn#startBundle}.
+ */
+ public void startBundle();
+
+ /**
+ * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element.
+ */
+ public void processElement(WindowedValue<InputT> elem);
+
+ /**
+ * Calls {@link DoFn#finishBundle} and performs additional tasks, such as
+ * flushing in-memory states.
+ */
+ public void finishBundle();
+
+ /**
+ * An internal interface for signaling that a {@link DoFn} requires late data dropping.
+ */
+ public interface ReduceFnExecutor<K, InputT, OutputT, W> {
+ /**
+ * Gets this object as a {@link DoFn}.
+ *
+ * Most implementors of this interface are expected to be {@link DoFn} instances, and will
+ * return themselves.
+ */
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
+
+ /**
+ * Returns an aggregator that tracks elements that are dropped due to being late.
+ */
+ Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
new file mode 100644
index 0000000..1ebe72b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A base implementation of {@link DoFnRunner}.
+ *
+ * <p> Sub-classes should override {@link #invokeProcessElement}.
+ */
+public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ /** The DoFn being run. */
+ public final DoFn<InputT, OutputT> fn;
+
+ /** The context used for running the DoFn. */
+ public final DoFnContext<InputT, OutputT> context;
+
+ protected DoFnRunnerBase(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ CounterSet.AddCounterMutator addCounterMutator,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.fn = fn;
+ this.context = new DoFnContext<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ addCounterMutator,
+ windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+ }
+
+ /**
+ * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
+ * contexts such as the {@link DirectRunner}.
+ */
+ public static class ListOutputManager implements OutputManager {
+
+ private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+
+ if (outputList == null) {
+ outputList = Lists.newArrayList();
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<?>> untypedList = (List) outputList;
+ outputLists.put(tag, untypedList);
+ }
+
+ outputList.add(output);
+ }
+
+ public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+ // Safe cast by design, inexpressible in Java without rawtypes
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+ return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
+ }
+ }
+
+ @Override
+ public void startBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.startBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ if (elem.getWindows().size() <= 1
+ || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+ && context.sideInputReader.isEmpty())) {
+ invokeProcessElement(elem);
+ } else {
+ // We could modify the windowed value (and the processContext) to
+ // avoid repeated allocations, but this is more straightforward.
+ for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+ invokeProcessElement(windowedValue);
+ }
+ }
+ }
+
+ /**
+ * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
+ * {@link DoFnRunnerBase#processElement}.
+ */
+ protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
+
+ @Override
+ public void finishBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.finishBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ /**
+ * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+ *
+ * @param <InputT> the type of the DoFn's (main) input elements
+ * @param <OutputT> the type of the DoFn's (main) output elements
+ */
+ private static class DoFnContext<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.Context {
+ private static final int MAX_SIDE_OUTPUTS = 1000;
+
+ final PipelineOptions options;
+ final DoFn<InputT, OutputT> fn;
+ final SideInputReader sideInputReader;
+ final OutputManager outputManager;
+ final TupleTag<OutputT> mainOutputTag;
+ final StepContext stepContext;
+ final CounterSet.AddCounterMutator addCounterMutator;
+ final WindowFn<?, ?> windowFn;
+
+ /**
+ * The set of known output tags, some of which may be undeclared, so we can throw an
+ * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+ */
+ private Set<TupleTag<?>> outputTags;
+
+ public DoFnContext(PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ CounterSet.AddCounterMutator addCounterMutator,
+ WindowFn<?, ?> windowFn) {
+ fn.super();
+ this.options = options;
+ this.fn = fn;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.outputTags = Sets.newHashSet();
+
+ outputTags.add(mainOutputTag);
+ for (TupleTag<?> sideOutputTag : sideOutputTags) {
+ outputTags.add(sideOutputTag);
+ }
+
+ this.stepContext = stepContext;
+ this.addCounterMutator = addCounterMutator;
+ this.windowFn = windowFn;
+ super.setupDelegateAggregators();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+ T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ // The windowFn can never succeed at accessing the element, so its type does not
+ // matter here
+ @SuppressWarnings("unchecked")
+ WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+ windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input element when none was available");
+ }
+
+ @Override
+ public Instant timestamp() {
+ if (inputTimestamp == null) {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input timestamp when none was available");
+ }
+ return inputTimestamp;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input windows when none were available");
+ }
+ });
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ return WindowedValue.of(output, timestamp, windows, pane);
+ }
+
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ if (!sideInputReader.contains(view)) {
+ throw new IllegalArgumentException("calling sideInput() with unknown view");
+ }
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ return sideInputReader.get(view, sideInputWindow);
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+ outputManager.output(mainOutputTag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteOutput(windowedElem);
+ }
+ }
+
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+ T output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ if (!outputTags.contains(tag)) {
+ // This tag wasn't declared nor was it seen before during this execution.
+ // Thus, this must be a new, undeclared and unconsumed output.
+ // To prevent likely user errors, enforce the limit on the number of side
+ // outputs.
+ if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+ throw new IllegalArgumentException(
+ "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+ }
+ outputTags.add(tag);
+ }
+
+ outputManager.output(tag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteSideOutput(tag, windowedElem);
+ }
+ }
+
+ // Following implementations of output, outputWithTimestamp, and sideOutput
+ // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
+ // ProcessContext's versions in DoFn.processElement.
+ @Override
+ public void output(OutputT output) {
+ outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+ sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+ sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ private String generateInternalAggregatorName(String userName) {
+ boolean system = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+ return (system ? "" : "user-") + stepContext.getStepName() + "-" + userName;
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ Preconditions.checkNotNull(combiner,
+ "Combiner passed to createAggregator cannot be null");
+ return new CounterAggregator<>(generateInternalAggregatorName(name),
+ combiner, addCounterMutator);
+ }
+ }
+
+ /**
+ * Returns a new {@code DoFn.ProcessContext} for the given element.
+ */
+ protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
+ return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+ }
+
+ protected RuntimeException wrapUserCodeException(Throwable t) {
+ throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+ }
+
+ private boolean isSystemDoFn() {
+ return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+ }
+
+ /**
+ * A concrete implementation of {@code DoFn.ProcessContext} used for
+ * running a {@link DoFn} over a single element.
+ *
+ * @param <InputT> the type of the DoFn's (main) input elements
+ * @param <OutputT> the type of the DoFn's (main) output elements
+ */
+ static class DoFnProcessContext<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.ProcessContext {
+
+
+ final DoFn<InputT, OutputT> fn;
+ final DoFnContext<InputT, OutputT> context;
+ final WindowedValue<InputT> windowedValue;
+
+ public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+ DoFnContext<InputT, OutputT> context,
+ WindowedValue<InputT> windowedValue) {
+ fn.super();
+ this.fn = fn;
+ this.context = context;
+ this.windowedValue = windowedValue;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public InputT element() {
+ return windowedValue.getValue();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
+ Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+ BoundedWindow window;
+ if (!windowIter.hasNext()) {
+ if (context.windowFn instanceof GlobalWindows) {
+ // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+ // without windows
+ window = GlobalWindow.INSTANCE;
+ } else {
+ throw new IllegalStateException(
+ "sideInput called when main input element is not in any windows");
+ }
+ } else {
+ window = windowIter.next();
+ if (windowIter.hasNext()) {
+ throw new IllegalStateException(
+ "sideInput called when main input element is in multiple windows");
+ }
+ }
+ return context.sideInput(view, window);
+ }
+
+ @Override
+ public BoundedWindow window() {
+ if (!(fn instanceof RequiresWindowAccess)) {
+ throw new UnsupportedOperationException(
+ "window() is only available in the context of a DoFn marked as RequiresWindow.");
+ }
+ return Iterables.getOnlyElement(windows());
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.outputWindowedValue(windowedValue.withValue(output));
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ checkTimestamp(timestamp);
+ context.outputWindowedValue(output, timestamp,
+ windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ context.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+ context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ checkTimestamp(timestamp);
+ context.sideOutputWindowedValue(
+ tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ @Override
+ public Instant timestamp() {
+ return windowedValue.getTimestamp();
+ }
+
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ private void checkTimestamp(Instant timestamp) {
+ if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+ + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+ timestamp, windowedValue.getTimestamp(),
+ PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+ }
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return new WindowingInternals<InputT, OutputT>() {
+ @Override
+ public void outputWindowedValue(OutputT output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ context.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return context.stepContext.timerInternals();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<T> elemCoder) throws IOException {
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+ context.stepContext.writePCollectionViewData(
+ tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+ window(), windowCoder);
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return context.stepContext.stateInternals();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ return context.sideInput(view, mainInputWindow);
+ }
+ };
+ }
+
+ @Override
+ protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+ createAggregatorInternal(
+ String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
new file mode 100644
index 0000000..648a281
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.List;
+
+/**
+ * Static utility methods that provide {@link DoFnRunner} implementations.
+ */
+public class DoFnRunners {
+ /**
+ * Information about how to create output receivers and output to them.
+ */
+ public interface OutputManager {
+ /**
+ * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
+ */
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
+ }
+
+ /**
+ * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+ *
+ * <p>It invokes {@link DoFn#processElement} for each input.
+ */
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ CounterSet.AddCounterMutator addCounterMutator,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new SimpleDoFnRunner<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ addCounterMutator,
+ windowingStrategy);
+ }
+
+ /**
+ * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
+ *
+ * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+ */
+ public static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
+ PipelineOptions options,
+ ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<KV<K, OutputT>> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ CounterSet.AddCounterMutator addCounterMutator,
+ WindowingStrategy<?, W> windowingStrategy) {
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
+ simpleRunner(
+ options,
+ reduceFnExecutor.asDoFn(),
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ addCounterMutator,
+ windowingStrategy);
+ return new LateDataDroppingDoFnRunner<>(
+ simpleDoFnRunner,
+ windowingStrategy,
+ stepContext.timerInternals(),
+ reduceFnExecutor.getDroppedDueToLatenessAggregator());
+ }
+
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> doFn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AddCounterMutator addCounterMutator,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ if (doFn instanceof ReduceFnExecutor) {
+ @SuppressWarnings("rawtypes")
+ ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
+ @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+ DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ (TupleTag) mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ addCounterMutator,
+ (WindowingStrategy) windowingStrategy);
+ return runner;
+ }
+ return simpleRunner(
+ options,
+ doFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ addCounterMutator,
+ windowingStrategy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
new file mode 100644
index 0000000..f5de0bc
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * DoFn that merges windows and groups elements in those windows, optionally
+ * combining values.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+@SystemDoFnInternal
+public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
+ extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+ public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+ public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+
+ protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+ createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ protected final Aggregator<Long, Long> droppedDueToLateness =
+ createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+ /**
+ * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the
+ * grouping.
+ *
+ * @param windowingStrategy The window function and trigger to use for grouping
+ * @param inputCoder the input coder to use
+ */
+ public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W>
+ createDefault(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
+ windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..d364168
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
+ * implementation is applicable.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+ extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+
+ private final WindowingStrategy<?, W> strategy;
+ private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+ public GroupAlsoByWindowsViaOutputBufferDoFn(
+ WindowingStrategy<?, W> windowingStrategy,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+ this.strategy = windowingStrategy;
+ this.reduceFn = reduceFn;
+ }
+
+ @Override
+ public void processElement(
+ DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+ throws Exception {
+ K key = c.element().getKey();
+ // Used with Batch, we know that all the data is available for this key. We can't use the
+ // timer manager from the context because it doesn't exist. So we create one and emulate the
+ // watermark, knowing that we have all data and it is in timestamp order.
+ BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
+
+ // It is the responsibility of the user of GroupAlsoByWindowsViaOutputBufferDoFn to only
+ // provide a WindowingInternals instance with the appropriate key type for StateInternals.
+ @SuppressWarnings("unchecked")
+ StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+
+ ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+ new ReduceFnRunner<K, InputT, OutputT, W>(
+ key,
+ strategy,
+ stateInternals,
+ timerInternals,
+ c.windowingInternals(),
+ droppedDueToClosedWindow,
+ reduceFn,
+ c.getPipelineOptions());
+
+ Iterable<List<WindowedValue<InputT>>> chunks =
+ Iterables.partition(c.element().getValue(), 1000);
+ for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+ // Process the chunk of elements.
+ reduceFnRunner.processElements(chunk);
+
+ // Then, since elements are sorted by their timestamp, advance the input watermark
+ // to the first element, and fire any timers that may have been scheduled.
+ timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+
+ // Fire any processing timers that need to fire
+ timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+
+ // Leave the output watermark undefined. Since there's no late data in batch mode
+ // there's really no need to track it as we do for streaming.
+ }
+
+ // Finish any pending windows by advancing the input watermark to infinity.
+ timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ reduceFnRunner.persist();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..9450495
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
+ * <ol>
+ * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
+ * the previously-implicit timestamp and window into the elements themselves, so a
+ * window-and-timestamp-unaware transform can operate on them.</li>
+ * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
+ * and timestamps. Many window-unaware runners have such a primitive already.</li>
+ * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
+ * output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
+ * implemented as a {@link ParDo} that calls reserved internal methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
+ * A streaming-style partition, with multiple elements for the same key, will not yield
+ * correct results.</li>
+ * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
+ * for large iterables.</li>
+ * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
+ * appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ private GroupByKey<K, V> gbkTransform;
+
+ public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+ this.gbkTransform = originalTransform;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ return input
+ // Make each input element's timestamp and assigned windows
+ // explicit, in the value part.
+ .apply(new ReifyTimestampsAndWindows<K, V>())
+
+ // Group by just the key.
+ // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+ // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+ // introduced in here.
+ .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+ // Sort each key's values by timestamp. GroupAlsoByWindow requires
+ // its input to be sorted by timestamp.
+ .apply(new SortValuesByTimestamp<K, V>())
+
+ // Group each key's values by window, merging windows as needed.
+ .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(
+ gbkTransform.updateWindowingStrategy(windowingStrategy));
+ }
+
+ /**
+ * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+ * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+ * or evaluate this class.
+ */
+ public static class GroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+
+ @Override
+ public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+ return GroupByKey.getOutputKvCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that sorts the values associated with each key by timestamp.
+ */
+ private static class SortValuesByTimestamp<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+ @Override
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+ K key = kvs.getKey();
+ Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+ List<WindowedValue<V>> sortedValues = new ArrayList<>();
+ for (WindowedValue<V> value : unsortedValues) {
+ sortedValues.add(value);
+ }
+ Collections.sort(
+ sortedValues,
+ new Comparator<WindowedValue<V>>() {
+ @Override
+ public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+ return e1.getTimestamp().compareTo(e2.getTimestamp());
+ }
+ });
+ c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+ }
+ }))
+ .setCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that takes a collection of timestamp-ordered
+ * values associated with each key, groups the values by window,
+ * combines windows as needed, and for each window in each key,
+ * outputs a collection of key/value-list pairs implicitly assigned
+ * to the window and with the timestamp derived from that window.
+ */
+ private static class GroupAlsoByWindow<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PCollection<KV<K, Iterable<V>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+ IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+ (IterableCoder<WindowedValue<V>>) inputValueCoder;
+ Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+ WindowedValueCoder<V> inputIterableWindowedValueCoder =
+ (WindowedValueCoder<V>) inputIterableElementCoder;
+
+ Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+ Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+ Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+ return input
+ .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+ .setCoder(outputKvCoder);
+ }
+
+ private <W extends BoundedWindow>
+ GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+ WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+ strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
new file mode 100644
index 0000000..4815162
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Instant;
+
+/**
+ * A customized {@link DoFnRunner} that handles late data dropping for
+ * a {@link KeyedWorkItem} input {@link DoFn}.
+ *
+ * <p>It expands windows before checking data lateness.
+ *
+ * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
+ implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+ private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
+ private final LateDataFilter lateDataFilter;
+
+ public LateDataDroppingDoFnRunner(
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TimerInternals timerInternals,
+ Aggregator<Long, Long> droppedDueToLateness) {
+ this.doFnRunner = doFnRunner;
+ lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
+ }
+
+ @Override
+ public void startBundle() {
+ doFnRunner.startBundle();
+ }
+
+ @Override
+ public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+ Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
+ elem.getValue().key(), elem.getValue().elementsIterable());
+ KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
+ elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
+ doFnRunner.processElement(elem.withValue(keyedWorkItem));
+ }
+
+ @Override
+ public void finishBundle() {
+ doFnRunner.finishBundle();
+ }
+
+ /**
+ * It filters late data in a {@link KeyedWorkItem}.
+ */
+ @VisibleForTesting
+ static class LateDataFilter {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final TimerInternals timerInternals;
+ private final Aggregator<Long, Long> droppedDueToLateness;
+
+ public LateDataFilter(
+ WindowingStrategy<?, ?> windowingStrategy,
+ TimerInternals timerInternals,
+ Aggregator<Long, Long> droppedDueToLateness) {
+ this.windowingStrategy = windowingStrategy;
+ this.timerInternals = timerInternals;
+ this.droppedDueToLateness = droppedDueToLateness;
+ }
+
+ /**
+ * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
+ * non-late input elements.
+ */
+ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
+ final K key, Iterable<WindowedValue<InputT>> elements) {
+ Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
+ elements,
+ new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
+ @Override
+ public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
+ return Iterables.transform(
+ input.getWindows(),
+ new Function<BoundedWindow, WindowedValue<InputT>>() {
+ @Override
+ public WindowedValue<InputT> apply(BoundedWindow window) {
+ return WindowedValue.of(
+ input.getValue(), input.getTimestamp(), window, input.getPane());
+ }
+ });
+ }});
+
+ Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
+ Iterables.concat(windowsExpandedElements),
+ new Predicate<WindowedValue<InputT>>() {
+ @Override
+ public boolean apply(WindowedValue<InputT> input) {
+ BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+ if (canDropDueToExpiredWindow(window)) {
+ // The element is too late for this window.
+ droppedDueToLateness.addValue(1L);
+ WindowTracing.debug(
+ "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ + "since too far behind inputWatermark:{}; outputWatermark:{}",
+ input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ return nonLateElements;
+ }
+
+ /** Is {@code window} expired w.r.t. the garbage collection watermark? */
+ private boolean canDropDueToExpiredWindow(BoundedWindow window) {
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
new file mode 100644
index 0000000..e809c24
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
+ * their last triggering.
+ *
+ * @param <W> The kind of windows being tracked.
+ */
+public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
+
+ static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
+ WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
+ if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+ return new DiscardingModeNonEmptyPanes<>(reduceFn);
+ } else {
+ return new GeneralNonEmptyPanes<>();
+ }
+ }
+
+ /**
+ * Record that some content has been added to the window in {@code context}, and therefore the
+ * current pane is not empty.
+ */
+ public abstract void recordContent(StateAccessor<K> context);
+
+ /**
+ * Record that the given pane is empty.
+ */
+ public abstract void clearPane(StateAccessor<K> state);
+
+ /**
+ * Return true if the current pane for the window in {@code context} is empty.
+ */
+ public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+
+ /**
+ * Prefetch in preparation for merging.
+ */
+ public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
+
+ /**
+ * Eagerly merge backing state.
+ */
+ public abstract void onMerge(MergingStateAccessor<K, W> context);
+
+ /**
+ * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
+ * presence of data in the accumulation buffer to record non-empty panes.
+ */
+ private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
+ extends NonEmptyPanes<K, W> {
+
+ private ReduceFn<K, ?, ?, W> reduceFn;
+
+ private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
+ this.reduceFn = reduceFn;
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+ return reduceFn.isEmpty(state);
+ }
+
+ @Override
+ public void recordContent(StateAccessor<K> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void clearPane(StateAccessor<K> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void onMerge(MergingStateAccessor<K, W> context) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+ }
+
+ /**
+ * An implementation of {@code NonEmptyPanes} for general use.
+ */
+ private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
+ extends NonEmptyPanes<K, W> {
+
+ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+ PANE_ADDITIONS_TAG =
+ StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+ "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+ @Override
+ public void recordContent(StateAccessor<K> state) {
+ state.access(PANE_ADDITIONS_TAG).add(1L);
+ }
+
+ @Override
+ public void clearPane(StateAccessor<K> state) {
+ state.access(PANE_ADDITIONS_TAG).clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+ return state.access(PANE_ADDITIONS_TAG).isEmpty();
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+ StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
+ }
+
+ @Override
+ public void onMerge(MergingStateAccessor<K, W> context) {
+ StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
new file mode 100644
index 0000000..5e08031
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+/**
+ * Determine the timing and other properties of a new pane for a given computation, key and window.
+ * Incorporates any previous pane, whether the pane has been produced because an
+ * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
+ * and the current output watermark.
+ */
+public class PaneInfoTracker {
+ private TimerInternals timerInternals;
+
+ public PaneInfoTracker(TimerInternals timerInternals) {
+ this.timerInternals = timerInternals;
+ }
+
+ @VisibleForTesting
+ static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
+
+ public void clear(StateAccessor<?> state) {
+ state.access(PANE_INFO_TAG).clear();
+ }
+
+ /**
+ * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
+ * info includes the timing for the pane, who's calculation is quite subtle.
+ *
+ * @param isFinal should be {@code true} only if the triggering machinery can guarantee
+ * no further firings for the
+ */
+ public ReadableState<PaneInfo> getNextPaneInfo(
+ ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
+ final Object key = context.key();
+ final ReadableState<PaneInfo> previousPaneFuture =
+ context.state().access(PaneInfoTracker.PANE_INFO_TAG);
+ final Instant windowMaxTimestamp = context.window().maxTimestamp();
+
+ return new ReadableState<PaneInfo>() {
+ @Override
+ public ReadableState<PaneInfo> readLater() {
+ previousPaneFuture.readLater();
+ return this;
+ }
+
+ @Override
+ public PaneInfo read() {
+ PaneInfo previousPane = previousPaneFuture.read();
+ return describePane(key, windowMaxTimestamp, previousPane, isFinal);
+ }
+ };
+ }
+
+ public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
+ context.state().access(PANE_INFO_TAG).write(currentPane);
+ }
+
+ private <W> PaneInfo describePane(
+ Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
+ boolean isFirst = previousPane == null;
+ Timing previousTiming = isFirst ? null : previousPane.getTiming();
+ long index = isFirst ? 0 : previousPane.getIndex() + 1;
+ long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
+ Instant outputWM = timerInternals.currentOutputWatermarkTime();
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+ // True if it is not possible to assign the element representing this pane a timestamp
+ // which will make an ON_TIME pane for any following computation.
+ // Ie true if the element's latest possible timestamp is before the current output watermark.
+ boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
+
+ // True if all emitted panes (if any) were EARLY panes.
+ // Once the ON_TIME pane has fired, all following panes must be considered LATE even
+ // if the output watermark is behind the end of the window.
+ boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
+
+ // True is the input watermark hasn't passed the window's max timestamp.
+ boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
+
+ Timing timing;
+ if (isLateForOutput || !onlyEarlyPanesSoFar) {
+ // The output watermark has already passed the end of this window, or we have already
+ // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
+ // consider this pane LATE.
+ timing = Timing.LATE;
+ } else if (isEarlyForInput) {
+ // This is an EARLY firing.
+ timing = Timing.EARLY;
+ nonSpeculativeIndex = -1;
+ } else {
+ // This is the unique ON_TIME firing for the window.
+ timing = Timing.ON_TIME;
+ }
+
+ WindowTracing.debug(
+ "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+ + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+ timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
+
+ if (previousPane != null) {
+ // Timing transitions should follow EARLY* ON_TIME? LATE*
+ switch (previousTiming) {
+ case EARLY:
+ Preconditions.checkState(
+ timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
+ "EARLY cannot transition to %s", timing);
+ break;
+ case ON_TIME:
+ Preconditions.checkState(
+ timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
+ break;
+ case LATE:
+ Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
+ break;
+ case UNKNOWN:
+ break;
+ }
+ Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
+ }
+
+ return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
+ }
+}