You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:43:58 UTC
[11/11] incubator-beam git commit: Put classes in runners-core
package into runners.core namespace
Put classes in runners-core package into runners.core namespace
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a62e5018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a62e5018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a62e5018
Branch: refs/heads/master
Commit: a62e5018d6fa0a4e6ed05b872a53a4eb36415d1d
Parents: 3ef7a35
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 25 14:57:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:35:39 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/AssignWindows.java | 46 +
.../beam/runners/core/AssignWindowsDoFn.java | 79 +
.../beam/runners/core/BatchTimerInternals.java | 140 ++
.../apache/beam/runners/core/DoFnRunner.java | 65 +
.../beam/runners/core/DoFnRunnerBase.java | 559 +++++++
.../apache/beam/runners/core/DoFnRunners.java | 148 ++
.../runners/core/ElementByteSizeObservable.java | 44 +
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +-
.../runners/core/GroupAlsoByWindowsDoFn.java | 66 +
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 100 ++
.../core/GroupByKeyViaGroupByKeyOnly.java | 271 ++++
.../core/LateDataDroppingDoFnRunner.java | 151 ++
.../apache/beam/runners/core/NonEmptyPanes.java | 151 ++
.../beam/runners/core/PaneInfoTracker.java | 158 ++
.../beam/runners/core/PeekingReiterator.java | 100 ++
.../core/PushbackSideInputDoFnRunner.java | 116 ++
.../org/apache/beam/runners/core/ReduceFn.java | 130 ++
.../runners/core/ReduceFnContextFactory.java | 499 ++++++
.../beam/runners/core/ReduceFnRunner.java | 993 ++++++++++++
.../beam/runners/core/SimpleDoFnRunner.java | 58 +
.../beam/runners/core/SystemReduceFn.java | 139 ++
.../apache/beam/runners/core/TriggerRunner.java | 247 +++
.../apache/beam/runners/core/WatermarkHold.java | 539 +++++++
.../org/apache/beam/sdk/util/AssignWindows.java | 46 -
.../apache/beam/sdk/util/AssignWindowsDoFn.java | 77 -
.../beam/sdk/util/BatchTimerInternals.java | 137 --
.../org/apache/beam/sdk/util/DoFnRunner.java | 63 -
.../apache/beam/sdk/util/DoFnRunnerBase.java | 551 -------
.../org/apache/beam/sdk/util/DoFnRunners.java | 143 --
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 63 -
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 97 --
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 268 ----
.../sdk/util/LateDataDroppingDoFnRunner.java | 145 --
.../org/apache/beam/sdk/util/NonEmptyPanes.java | 150 --
.../apache/beam/sdk/util/PaneInfoTracker.java | 156 --
.../sdk/util/PushbackSideInputDoFnRunner.java | 113 --
.../java/org/apache/beam/sdk/util/ReduceFn.java | 128 --
.../beam/sdk/util/ReduceFnContextFactory.java | 493 ------
.../apache/beam/sdk/util/ReduceFnRunner.java | 983 ------------
.../apache/beam/sdk/util/SimpleDoFnRunner.java | 55 -
.../apache/beam/sdk/util/SystemReduceFn.java | 138 --
.../org/apache/beam/sdk/util/TriggerRunner.java | 241 ---
.../org/apache/beam/sdk/util/WatermarkHold.java | 536 -------
.../util/common/ElementByteSizeObservable.java | 42 -
.../beam/sdk/util/common/PeekingReiterator.java | 99 --
.../beam/sdk/util/common/package-info.java | 20 -
.../org/apache/beam/sdk/util/package-info.java | 20 -
.../runners/core/BatchTimerInternalsTest.java | 118 ++
.../core/GroupAlsoByWindowsProperties.java | 660 ++++++++
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 110 ++
.../core/LateDataDroppingDoFnRunnerTest.java | 117 ++
.../core/PushbackSideInputDoFnRunnerTest.java | 235 +++
.../beam/runners/core/ReduceFnRunnerTest.java | 1446 ++++++++++++++++++
.../beam/runners/core/ReduceFnTester.java | 796 ++++++++++
.../beam/runners/core/SimpleDoFnRunnerTest.java | 88 ++
.../beam/sdk/util/BatchTimerInternalsTest.java | 117 --
.../sdk/util/GroupAlsoByWindowsProperties.java | 658 --------
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 109 --
.../util/LateDataDroppingDoFnRunnerTest.java | 114 --
.../util/PushbackSideInputDoFnRunnerTest.java | 231 ---
.../beam/sdk/util/ReduceFnRunnerTest.java | 1442 -----------------
.../apache/beam/sdk/util/ReduceFnTester.java | 784 ----------
.../beam/sdk/util/SimpleDoFnRunnerTest.java | 84 -
.../GroupAlsoByWindowEvaluatorFactory.java | 8 +-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +-
.../beam/runners/direct/ParDoEvaluator.java | 8 +-
.../direct/UncommittedBundleOutputManager.java | 2 +-
.../FlinkStreamingTransformTranslators.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 6 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../apache/beam/runners/spark/SparkRunner.java | 2 +-
.../spark/translation/TransformTranslator.java | 10 +-
.../streaming/StreamingTransformTranslator.java | 2 +-
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../org/apache/beam/sdk/util/BitSetCoder.java | 2 +-
75 files changed, 8395 insertions(+), 8332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
new file mode 100644
index 0000000..f2387f5
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private WindowFn<? super T, W> fn;
+
+ public AssignWindows(WindowFn<? super T, W> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
new file mode 100644
index 0000000..0eb1667
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
+ * provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+@SystemDoFnInternal
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
+ implements RequiresWindowAccess {
+ private WindowFn<? super T, W> fn;
+
+ public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
+ this.fn =
+ checkNotNull(
+ fn,
+ "%s provided to %s cannot be null",
+ WindowFn.class.getSimpleName(),
+ AssignWindowsDoFn.class.getSimpleName());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(final ProcessContext c) throws Exception {
+ Collection<W> windows =
+ ((WindowFn<T, W>) fn).assignWindows(
+ ((WindowFn<T, W>) fn).new AssignContext() {
+ @Override
+ public T element() {
+ return c.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return c.timestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(c.windowingInternals().windows());
+ }
+ });
+
+ c.windowingInternals()
+ .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
new file mode 100644
index 0000000..829dbde
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+
+import org.joda.time.Instant;
+
+/**
+ * TimerInternals that uses priority queues to manage the timers that are ready to fire.
+ */
+public class BatchTimerInternals implements TimerInternals {
+ /** Set of timers that are scheduled used for deduplicating timers. */
+ private Set<TimerData> existingTimers = new HashSet<>();
+
+ // Keep these queues separate so we can advance over them separately.
+ private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+ private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+ private Instant inputWatermarkTime;
+ private Instant processingTime;
+
+ private PriorityQueue<TimerData> queue(TimeDomain domain) {
+ return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
+ }
+
+ public BatchTimerInternals(Instant processingTime) {
+ this.processingTime = processingTime;
+ this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public void setTimer(TimerData timer) {
+ if (existingTimers.add(timer)) {
+ queue(timer.getDomain()).add(timer);
+ }
+ }
+
+ @Override
+ public void deleteTimer(TimerData timer) {
+ existingTimers.remove(timer);
+ queue(timer.getDomain()).remove(timer);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return processingTime;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
+ * is already complete.
+ */
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return inputWatermarkTime;
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ // The output watermark is always undefined in batch mode.
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("watermarkTimers", watermarkTimers)
+ .add("processingTimers", processingTimers)
+ .toString();
+ }
+
+ public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
+ throws Exception {
+ checkState(!newInputWatermark.isBefore(inputWatermarkTime),
+ "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
+ newInputWatermark);
+ inputWatermarkTime = newInputWatermark;
+ advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
+ }
+
+ public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
+ throws Exception {
+ checkState(!newProcessingTime.isBefore(processingTime),
+ "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
+ processingTime = newProcessingTime;
+ advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
+ }
+
+ private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
+ throws Exception {
+ PriorityQueue<TimerData> timers = queue(domain);
+ boolean shouldFire = false;
+
+ do {
+ TimerData timer = timers.peek();
+ // Timers fire if the new time is ahead of the timer
+ shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
+ if (shouldFire) {
+ // Remove before firing, so that if the trigger adds another identical
+ // timer we don't remove it.
+ timers.remove();
+ runner.onTimer(timer);
+ }
+ } while (shouldFire);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
new file mode 100644
index 0000000..f4c8eea
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An wrapper interface that represents the execution of a {@link OldDoFn}.
+ */
+public interface DoFnRunner<InputT, OutputT> {
+ /**
+ * Prepares and calls {@link OldDoFn#startBundle}.
+ */
+ public void startBundle();
+
+ /**
+ * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current
+ * element.
+ */
+ public void processElement(WindowedValue<InputT> elem);
+
+ /**
+ * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as
+ * flushing in-memory states.
+ */
+ public void finishBundle();
+
+ /**
+ * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
+ */
+ public interface ReduceFnExecutor<K, InputT, OutputT, W> {
+ /**
+ * Gets this object as a {@link OldDoFn}.
+ *
+ * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
+ * return themselves.
+ */
+ OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
+
+ /**
+ * Returns an aggregator that tracks elements that are dropped due to being late.
+ */
+ Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
new file mode 100644
index 0000000..71472da
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+/**
+ * A base implementation of {@link DoFnRunner}.
+ *
+ * <p> Sub-classes should override {@link #invokeProcessElement}.
+ */
+public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ /** The {@link OldDoFn} being run. */
+ public final OldDoFn<InputT, OutputT> fn;
+
+ /** The context used for running the {@link OldDoFn}. */
+ public final DoFnContext<InputT, OutputT> context;
+
+ protected DoFnRunnerBase(
+ PipelineOptions options,
+ OldDoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.fn = fn;
+ this.context = new DoFnContext<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+ }
+
+ /**
+ * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
+ * contexts such as the {@code DirectRunner}.
+ */
+ public static class ListOutputManager implements OutputManager {
+
+ private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+
+ if (outputList == null) {
+ outputList = Lists.newArrayList();
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<?>> untypedList = (List) outputList;
+ outputLists.put(tag, untypedList);
+ }
+
+ outputList.add(output);
+ }
+
+ public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+ // Safe cast by design, inexpressible in Java without rawtypes
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+ return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
+ }
+ }
+
+ @Override
+ public void startBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.startBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ if (elem.getWindows().size() <= 1
+ || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+ && context.sideInputReader.isEmpty())) {
+ invokeProcessElement(elem);
+ } else {
+ // We could modify the windowed value (and the processContext) to
+ // avoid repeated allocations, but this is more straightforward.
+ for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+ invokeProcessElement(windowedValue);
+ }
+ }
+ }
+
+ /**
+ * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in
+ * {@link DoFnRunnerBase#processElement}.
+ */
+ protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
+
+ @Override
+ public void finishBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.finishBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ /**
+ * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
+ *
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+ */
+ private static class DoFnContext<InputT, OutputT>
+ extends OldDoFn<InputT, OutputT>.Context {
+ private static final int MAX_SIDE_OUTPUTS = 1000;
+
+ final PipelineOptions options;
+ final OldDoFn<InputT, OutputT> fn;
+ final SideInputReader sideInputReader;
+ final OutputManager outputManager;
+ final TupleTag<OutputT> mainOutputTag;
+ final StepContext stepContext;
+ final AggregatorFactory aggregatorFactory;
+ final WindowFn<?, ?> windowFn;
+
+ /**
+ * The set of known output tags, some of which may be undeclared, so we can throw an
+ * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+ */
+ private Set<TupleTag<?>> outputTags;
+
+ public DoFnContext(PipelineOptions options,
+ OldDoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowFn<?, ?> windowFn) {
+ fn.super();
+ this.options = options;
+ this.fn = fn;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.outputTags = Sets.newHashSet();
+
+ outputTags.add(mainOutputTag);
+ for (TupleTag<?> sideOutputTag : sideOutputTags) {
+ outputTags.add(sideOutputTag);
+ }
+
+ this.stepContext = stepContext;
+ this.aggregatorFactory = aggregatorFactory;
+ this.windowFn = windowFn;
+ super.setupDelegateAggregators();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+ T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ // The windowFn can never succeed at accessing the element, so its type does not
+ // matter here
+ @SuppressWarnings("unchecked")
+ WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+ windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input element when none was available");
+ }
+
+ @Override
+ public Instant timestamp() {
+ if (inputTimestamp == null) {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input timestamp when none was available");
+ }
+ return inputTimestamp;
+ }
+
+ @Override
+ public W window() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input windows when none were available");
+ }
+ });
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ return WindowedValue.of(output, timestamp, windows, pane);
+ }
+
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ if (!sideInputReader.contains(view)) {
+ throw new IllegalArgumentException("calling sideInput() with unknown view");
+ }
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ return sideInputReader.get(view, sideInputWindow);
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+ outputManager.output(mainOutputTag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteOutput(windowedElem);
+ }
+ }
+
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+ T output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ if (!outputTags.contains(tag)) {
+ // This tag wasn't declared nor was it seen before during this execution.
+ // Thus, this must be a new, undeclared and unconsumed output.
+ // To prevent likely user errors, enforce the limit on the number of side
+ // outputs.
+ if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+ throw new IllegalArgumentException(
+ "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+ }
+ outputTags.add(tag);
+ }
+
+ outputManager.output(tag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteSideOutput(tag, windowedElem);
+ }
+ }
+
+ // Following implementations of output, outputWithTimestamp, and sideOutput
+ // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
+ // ProcessContext's versions in OldDoFn.processElement.
+ @Override
+ public void output(OutputT output) {
+ outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+ sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+ sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
+ return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
+ }
+ }
+
+ /**
+ * Returns a new {@link OldDoFn.ProcessContext} for the given element.
+ */
+ protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+ WindowedValue<InputT> elem) {
+ return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+ }
+
+ protected RuntimeException wrapUserCodeException(Throwable t) {
+ throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+ }
+
+ private boolean isSystemDoFn() {
+ return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+ }
+
+ /**
+ * A concrete implementation of {@link OldDoFn.ProcessContext} used for
+ * running a {@link OldDoFn} over a single element.
+ *
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+ */
+ static class DoFnProcessContext<InputT, OutputT>
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
+
+
+ final OldDoFn<InputT, OutputT> fn;
+ final DoFnContext<InputT, OutputT> context;
+ final WindowedValue<InputT> windowedValue;
+
+ public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
+ DoFnContext<InputT, OutputT> context,
+ WindowedValue<InputT> windowedValue) {
+ fn.super();
+ this.fn = fn;
+ this.context = context;
+ this.windowedValue = windowedValue;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public InputT element() {
+ return windowedValue.getValue();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+ BoundedWindow window;
+ if (!windowIter.hasNext()) {
+ if (context.windowFn instanceof GlobalWindows) {
+ // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+ // without windows
+ window = GlobalWindow.INSTANCE;
+ } else {
+ throw new IllegalStateException(
+ "sideInput called when main input element is not in any windows");
+ }
+ } else {
+ window = windowIter.next();
+ if (windowIter.hasNext()) {
+ throw new IllegalStateException(
+ "sideInput called when main input element is in multiple windows");
+ }
+ }
+ return context.sideInput(view, window);
+ }
+
+ @Override
+ public BoundedWindow window() {
+ if (!(fn instanceof RequiresWindowAccess)) {
+ throw new UnsupportedOperationException(
+ "window() is only available in the context of a OldDoFn marked as"
+ + "RequiresWindowAccess.");
+ }
+ return Iterables.getOnlyElement(windows());
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.outputWindowedValue(windowedValue.withValue(output));
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ checkTimestamp(timestamp);
+ context.outputWindowedValue(output, timestamp,
+ windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ context.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+ context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ checkTimestamp(timestamp);
+ context.sideOutputWindowedValue(
+ tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ @Override
+ public Instant timestamp() {
+ return windowedValue.getTimestamp();
+ }
+
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ private void checkTimestamp(Instant timestamp) {
+ if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+ + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+ timestamp, windowedValue.getTimestamp(),
+ PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+ }
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return new WindowingInternals<InputT, OutputT>() {
+ @Override
+ public void outputWindowedValue(OutputT output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ context.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return context.stepContext.timerInternals();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<T> elemCoder) throws IOException {
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+ context.stepContext.writePCollectionViewData(
+ tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+ window(), windowCoder);
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return context.stepContext.stateInternals();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ return context.sideInput(view, mainInputWindow);
+ }
+ };
+ }
+
+ @Override
+ protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+ createAggregatorInternal(
+ String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
new file mode 100644
index 0000000..7726374
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Static utility methods that provide {@link DoFnRunner} implementations.
+ */
+public class DoFnRunners {
+ /**
+ * Information about how to create output receivers and output to them.
+ */
+ public interface OutputManager {
+ /**
+ * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
+ */
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
+ }
+
+ /**
+ * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
+ *
+ * <p>It invokes {@link OldDoFn#processElement} for each input.
+ */
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+ PipelineOptions options,
+ OldDoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new SimpleDoFnRunner<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ }
+
+ /**
+ * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
+ *
+ * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+ */
+ public static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
+ PipelineOptions options,
+ ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<KV<K, OutputT>> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, W> windowingStrategy) {
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
+ simpleRunner(
+ options,
+ reduceFnExecutor.asDoFn(),
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ return new LateDataDroppingDoFnRunner<>(
+ simpleDoFnRunner,
+ windowingStrategy,
+ stepContext.timerInternals(),
+ reduceFnExecutor.getDroppedDueToLatenessAggregator());
+ }
+
+
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+ PipelineOptions options,
+ OldDoFn<InputT, OutputT> doFn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ if (doFn instanceof ReduceFnExecutor) {
+ @SuppressWarnings("rawtypes")
+ ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
+ @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+ DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ (TupleTag) mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ (WindowingStrategy) windowingStrategy);
+ return runner;
+ }
+ return simpleRunner(
+ options,
+ doFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
new file mode 100644
index 0000000..2380ba9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+
+/**
+ * An interface for things that allow observing the size in bytes of
+ * encoded values of type {@code T}.
+ *
+ * @param <T> the type of the values being observed
+ */
+public interface ElementByteSizeObservable<T> {
+ /**
+ * Returns whether {@link #registerByteSizeObserver} is cheap enough
+ * to call for every element, that is, if this
+ * {@code ElementByteSizeObservable} can calculate the byte size of
+ * the element to be coded in roughly constant time (or lazily).
+ */
+ public boolean isRegisterByteSizeObserverCheap(T value);
+
+ /**
+ * Notifies the {@code ElementByteSizeObserver} about the byte size
+ * of the encoded value using this {@code ElementByteSizeObservable}.
+ */
+ public void registerByteSizeObserver(T value,
+ ElementByteSizeObserver observer)
+ throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 7cdab00..b427037 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,16 +17,13 @@
*/
package org.apache.beam.runners.core;
+import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
-import org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn;
import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.ReduceFnRunner;
import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
new file mode 100644
index 0000000..9851449
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
+ * combining values.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+@SystemDoFnInternal
+public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
+ extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+ public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+ public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+
+ protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+ createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ protected final Aggregator<Long, Long> droppedDueToLateness =
+ createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+ /**
+ * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the
+ * grouping.
+ *
+ * @param windowingStrategy The window function and trigger to use for grouping
+ * @param inputCoder the input coder to use
+ */
+ public static <K, V, W extends BoundedWindow>
+ GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault(
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ Coder<V> inputCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
+ windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..091ad33
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
+ * implementation is applicable.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+ extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+
+ private final WindowingStrategy<?, W> strategy;
+ private final StateInternalsFactory<K> stateInternalsFactory;
+ private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+ public GroupAlsoByWindowsViaOutputBufferDoFn(
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+ this.strategy = windowingStrategy;
+ this.reduceFn = reduceFn;
+ this.stateInternalsFactory = stateInternalsFactory;
+ }
+
+ @Override
+ public void processElement(
+ OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+ throws Exception {
+ K key = c.element().getKey();
+ // Used with Batch, we know that all the data is available for this key. We can't use the
+ // timer manager from the context because it doesn't exist. So we create one and emulate the
+ // watermark, knowing that we have all data and it is in timestamp order.
+ BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
+ StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+
+ ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+ new ReduceFnRunner<K, InputT, OutputT, W>(
+ key,
+ strategy,
+ stateInternals,
+ timerInternals,
+ c.windowingInternals(),
+ droppedDueToClosedWindow,
+ reduceFn,
+ c.getPipelineOptions());
+
+ Iterable<List<WindowedValue<InputT>>> chunks =
+ Iterables.partition(c.element().getValue(), 1000);
+ for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+ // Process the chunk of elements.
+ reduceFnRunner.processElements(chunk);
+
+ // Then, since elements are sorted by their timestamp, advance the input watermark
+ // to the first element, and fire any timers that may have been scheduled.
+ timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+
+ // Fire any processing timers that need to fire
+ timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+
+ // Leave the output watermark undefined. Since there's no late data in batch mode
+ // there's really no need to track it as we do for streaming.
+ }
+
+ // Finish any pending windows by advancing the input watermark to infinity.
+ timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ reduceFnRunner.persist();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..b521425
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
+ * <ol>
+ * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
+ * the previously-implicit timestamp and window into the elements themselves, so a
+ * window-and-timestamp-unaware transform can operate on them.</li>
+ * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
+ * and timestamps. Many window-unaware runners have such a primitive already.</li>
+ * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
+ * output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
+ * implemented as a {@link ParDo} that calls reserved internal methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
+ * A streaming-style partition, with multiple elements for the same key, will not yield
+ * correct results.</li>
+ * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
+ * for large iterables.</li>
+ * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
+ * appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ private final GroupByKey<K, V> gbkTransform;
+
+ public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+ this.gbkTransform = originalTransform;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ return input
+ // Make each input element's timestamp and assigned windows
+ // explicit, in the value part.
+ .apply(new ReifyTimestampsAndWindows<K, V>())
+
+ // Group by just the key.
+ // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+ // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+ // introduced in here.
+ .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+ // Sort each key's values by timestamp. GroupAlsoByWindow requires
+ // its input to be sorted by timestamp.
+ .apply(new SortValuesByTimestamp<K, V>())
+
+ // Group each key's values by window, merging windows as needed.
+ .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(
+ gbkTransform.updateWindowingStrategy(windowingStrategy));
+ }
+
+ /**
+ * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+ * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+ * or evaluate this class.
+ */
+ public static class GroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+
+ @Override
+ public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+ return GroupByKey.getOutputKvCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that sorts the values associated with each key by timestamp.
+ */
+ private static class SortValuesByTimestamp<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+ @Override
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ return input
+ .apply(
+ ParDo.of(
+ new OldDoFn<
+ KV<K, Iterable<WindowedValue<V>>>,
+ KV<K, Iterable<WindowedValue<V>>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+ K key = kvs.getKey();
+ Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+ List<WindowedValue<V>> sortedValues = new ArrayList<>();
+ for (WindowedValue<V> value : unsortedValues) {
+ sortedValues.add(value);
+ }
+ Collections.sort(
+ sortedValues,
+ new Comparator<WindowedValue<V>>() {
+ @Override
+ public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+ return e1.getTimestamp().compareTo(e2.getTimestamp());
+ }
+ });
+ c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+ }
+ }))
+ .setCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Runner-specific primitive that takes a collection of timestamp-ordered values associated with
+ * each key, groups the values by window, merges windows as needed, and for each window in each
+ * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with
+ * the timestamp derived from that window.
+ */
+ public static class GroupAlsoByWindow<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder(
+ Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ // Coder<KV<...>> --> KvCoder<...>
+ checkArgument(inputCoder instanceof KvCoder,
+ "%s requires a %s<...> but got %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ inputCoder);
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> kvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder;
+ return kvCoder;
+ }
+
+ public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ return getKvCoder(inputCoder).getKeyCoder();
+ }
+
+ public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ // Coder<Iterable<...>> --> IterableCoder<...>
+ Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder =
+ getKvCoder(inputCoder).getValueCoder();
+ checkArgument(iterableWindowedValueCoder instanceof IterableCoder,
+ "%s requires a %s<..., %s> but got a %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ IterableCoder.class.getSimpleName(),
+ iterableWindowedValueCoder);
+ IterableCoder<WindowedValue<V>> iterableCoder =
+ (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder;
+
+ // Coder<WindowedValue<...>> --> WindowedValueCoder<...>
+ Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder();
+ checkArgument(iterableElementCoder instanceof WindowedValueCoder,
+ "%s requires a %s<..., %s<%s>> but got a %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ IterableCoder.class.getSimpleName(),
+ WindowedValueCoder.class.getSimpleName(),
+ iterableElementCoder);
+ WindowedValueCoder<V> windowedValueCoder =
+ (WindowedValueCoder<V>) iterableElementCoder;
+
+ return windowedValueCoder.getValueCoder();
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+ IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+ (IterableCoder<WindowedValue<V>>) inputValueCoder;
+ Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+ WindowedValueCoder<V> inputIterableWindowedValueCoder =
+ (WindowedValueCoder<V>) inputIterableElementCoder;
+
+ Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+ Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+ Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), windowingStrategy, input.isBounded())
+ .setCoder(outputKvCoder);
+ }
+
+ private <W extends BoundedWindow>
+ GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+ WindowingStrategy<?, W> strategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ Coder<V> inputIterableElementValueCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+ strategy,
+ stateInternalsFactory,
+ SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
new file mode 100644
index 0000000..63a80d2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * A customized {@link DoFnRunner} that handles late data dropping for
+ * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ *
+ * <p>It expands windows before checking data lateness.
+ *
+ * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
+ implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+ private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
+ private final LateDataFilter lateDataFilter;
+
+ public LateDataDroppingDoFnRunner(
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TimerInternals timerInternals,
+ Aggregator<Long, Long> droppedDueToLateness) {
+ this.doFnRunner = doFnRunner;
+ lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
+ }
+
+ @Override
+ public void startBundle() {
+ doFnRunner.startBundle();
+ }
+
+ @Override
+ public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+ Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
+ elem.getValue().key(), elem.getValue().elementsIterable());
+ KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
+ elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
+ doFnRunner.processElement(elem.withValue(keyedWorkItem));
+ }
+
+ @Override
+ public void finishBundle() {
+ doFnRunner.finishBundle();
+ }
+
+ /**
+ * It filters late data in a {@link KeyedWorkItem}.
+ */
+ @VisibleForTesting
+ static class LateDataFilter {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final TimerInternals timerInternals;
+ private final Aggregator<Long, Long> droppedDueToLateness;
+
+ public LateDataFilter(
+ WindowingStrategy<?, ?> windowingStrategy,
+ TimerInternals timerInternals,
+ Aggregator<Long, Long> droppedDueToLateness) {
+ this.windowingStrategy = windowingStrategy;
+ this.timerInternals = timerInternals;
+ this.droppedDueToLateness = droppedDueToLateness;
+ }
+
+ /**
+ * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
+ * non-late input elements.
+ */
+ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
+ final K key, Iterable<WindowedValue<InputT>> elements) {
+ Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
+ elements,
+ new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
+ @Override
+ public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
+ return Iterables.transform(
+ input.getWindows(),
+ new Function<BoundedWindow, WindowedValue<InputT>>() {
+ @Override
+ public WindowedValue<InputT> apply(BoundedWindow window) {
+ return WindowedValue.of(
+ input.getValue(), input.getTimestamp(), window, input.getPane());
+ }
+ });
+ }});
+
+ Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
+ Iterables.concat(windowsExpandedElements),
+ new Predicate<WindowedValue<InputT>>() {
+ @Override
+ public boolean apply(WindowedValue<InputT> input) {
+ BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+ if (canDropDueToExpiredWindow(window)) {
+ // The element is too late for this window.
+ droppedDueToLateness.addValue(1L);
+ WindowTracing.debug(
+ "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ + "since too far behind inputWatermark:{}; outputWatermark:{}",
+ input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ return nonLateElements;
+ }
+
+ /** Is {@code window} expired w.r.t. the garbage collection watermark? */
+ private boolean canDropDueToExpiredWindow(BoundedWindow window) {
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
new file mode 100644
index 0000000..3e51dfb
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
+ * their last triggering.
+ *
+ * @param <W> The kind of windows being tracked.
+ */
+public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
+
+ static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
+ WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
+ if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+ return new DiscardingModeNonEmptyPanes<>(reduceFn);
+ } else {
+ return new GeneralNonEmptyPanes<>();
+ }
+ }
+
+ /**
+ * Record that some content has been added to the window in {@code context}, and therefore the
+ * current pane is not empty.
+ */
+ public abstract void recordContent(StateAccessor<K> context);
+
+ /**
+ * Record that the given pane is empty.
+ */
+ public abstract void clearPane(StateAccessor<K> state);
+
+ /**
+ * Return true if the current pane for the window in {@code context} is empty.
+ */
+ public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+
+ /**
+ * Prefetch in preparation for merging.
+ */
+ public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
+
+ /**
+ * Eagerly merge backing state.
+ */
+ public abstract void onMerge(MergingStateAccessor<K, W> context);
+
+ /**
+ * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
+ * presence of data in the accumulation buffer to record non-empty panes.
+ */
+ private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
+ extends NonEmptyPanes<K, W> {
+
+ private ReduceFn<K, ?, ?, W> reduceFn;
+
+ private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
+ this.reduceFn = reduceFn;
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+ return reduceFn.isEmpty(state);
+ }
+
+ @Override
+ public void recordContent(StateAccessor<K> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void clearPane(StateAccessor<K> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+
+ @Override
+ public void onMerge(MergingStateAccessor<K, W> context) {
+ // Nothing to do -- the reduceFn is tracking contents
+ }
+ }
+
+ /**
+ * An implementation of {@code NonEmptyPanes} for general use.
+ */
+ private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
+ extends NonEmptyPanes<K, W> {
+
+ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+ PANE_ADDITIONS_TAG =
+ StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+ "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+ @Override
+ public void recordContent(StateAccessor<K> state) {
+ state.access(PANE_ADDITIONS_TAG).add(1L);
+ }
+
+ @Override
+ public void clearPane(StateAccessor<K> state) {
+ state.access(PANE_ADDITIONS_TAG).clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+ return state.access(PANE_ADDITIONS_TAG).isEmpty();
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+ StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
+ }
+
+ @Override
+ public void onMerge(MergingStateAccessor<K, W> context) {
+ StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
+ }
+ }
+}