You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:23 UTC
[40/50] [abbrv] beam git commit: Removes OldDoFn and its kin from
runners-core
Removes OldDoFn and its kin from runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6127f532
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6127f532
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6127f532
Branch: refs/heads/jstorm-runner
Commit: 6127f532b7e6ad0f13926d3d9aec17eb538108ed
Parents: 00b4a30
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri May 12 11:18:57 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 16 11:55:43 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/AssignWindowsDoFn.java | 78 --
.../apache/beam/runners/core/DoFnAdapters.java | 381 ----------
.../apache/beam/runners/core/DoFnRunners.java | 27 +-
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 113 ---
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 94 ---
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 6 +-
.../core/GroupAlsoByWindowsAggregators.java | 26 +
.../runners/core/GroupAlsoByWindowsDoFn.java | 39 -
.../core/LateDataDroppingDoFnRunner.java | 3 +-
.../org/apache/beam/runners/core/OldDoFn.java | 335 ---------
.../beam/runners/core/SimpleOldDoFnRunner.java | 499 -------------
.../core/WindowingInternalsAdapters.java | 74 --
...roupAlsoByWindowViaOutputBufferDoFnTest.java | 109 ---
.../core/GroupAlsoByWindowsProperties.java | 744 -------------------
.../apache/beam/runners/core/NoOpOldDoFn.java | 65 --
.../apache/beam/runners/core/OldDoFnTest.java | 51 --
.../runners/core/SimpleOldDoFnRunnerTest.java | 86 ---
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 9 +-
.../spark/translation/SparkAssignWindowFn.java | 4 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 4 +-
21 files changed, 43 insertions(+), 2710 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
deleted file mode 100644
index bbf3574..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
- * provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
- implements RequiresWindowAccess {
- private WindowFn<? super T, W> fn;
-
- public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
- this.fn =
- checkNotNull(
- fn,
- "%s provided to %s cannot be null",
- WindowFn.class.getSimpleName(),
- AssignWindowsDoFn.class.getSimpleName());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(final ProcessContext c) throws Exception {
- Collection<W> windows =
- ((WindowFn<T, W>) fn).assignWindows(
- ((WindowFn<T, W>) fn).new AssignContext() {
- @Override
- public T element() {
- return c.element();
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(c.windowingInternals().windows());
- }
- });
-
- c.windowingInternals()
- .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index af59a40..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.OldDoFn.Context;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- * DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
- /** Should not be instantiated. */
- private DoFnAdapters() {}
-
- /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
- DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
- if (signature.processElement().observesWindow()) {
- return new WindowDoFnAdapter<>(fn);
- } else {
- return new SimpleDoFnAdapter<>(fn);
- }
- }
-
- /**
- * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
- * OldDoFn}.
- */
- private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
- private final DoFn<InputT, OutputT> fn;
- private transient DoFnInvoker<InputT, OutputT> invoker;
-
- SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
- this.fn = fn;
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
-
- @Override
- public void setup() throws Exception {
- this.invoker.invokeSetup();
- }
-
- @Override
- public void startBundle(Context c) throws Exception {
- fn.prepareForProcessing();
- invoker.invokeStartBundle(new StartBundleContextAdapter<>(fn, c));
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- invoker.invokeFinishBundle(new FinishBundleContextAdapter<>(fn, c));
- }
-
- @Override
- public void teardown() throws Exception {
- this.invoker.invokeTeardown();
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- invoker.invokeProcessElement(adapter);
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return fn.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(fn);
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
- }
-
- /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
- private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
- implements OldDoFn.RequiresWindowAccess {
-
- WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
- super(fn);
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
- * DoFn.StartBundle} method, which means the extra context is unavailable.
- */
- private static class StartBundleContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.StartBundleContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
- private OldDoFn<InputT, OutputT>.Context context;
-
- private StartBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) {
- fn.super();
- this.context = context;
- }
-
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public BoundedWindow window() {
- // The OldDoFn doesn't allow us to ask for these outside processElement, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the window in processElement; elsewhere there is no defined window.");
- }
-
- @Override
- public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a FinishBundleContext in finishBundle");
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a ProcessContext in processElement");
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Timers are not supported for OldDoFn");
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
- * DoFn.FinishBundle} method, which means the extra context is unavailable.
- */
- private static class FinishBundleContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.FinishBundleContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.Context context;
-
- private FinishBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) {
- fn.super();
- this.context = context;
- }
-
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public BoundedWindow window() {
- // The OldDoFn doesn't allow us to ask for these outside processElement, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the window in processElement; elsewhere there is no defined window.");
- }
-
- @Override
- public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a StartBundleContext in startBundle");
- }
-
- @Override
- public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a ProcessContext in processElement");
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Timers are not supported for OldDoFn");
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
-
- @Override
- public void output(
- OutputT output, Instant timestamp, BoundedWindow window) {
- // Not full fidelity conversion. This should be removed as soon as possible.
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void output(
- TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
- // Not full fidelity conversion. This should be removed as soon as possible.
- context.outputWithTimestamp(tag, output, timestamp);
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
- */
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- context.output(tag, output);
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.outputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public void updateWatermark(Instant watermark) {
- throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- return null;
- }
-
- @Override
- public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
- return null;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index ee3aefa..f3cca6f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -74,34 +74,9 @@ public class DoFnRunners {
}
/**
- * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
- *
- * <p>It invokes {@link OldDoFn#processElement} for each input.
- */
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
- PipelineOptions options,
- OldDoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- StepContext stepContext,
- WindowingStrategy<?, ?> windowingStrategy) {
- return new SimpleOldDoFnRunner<>(
- options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- windowingStrategy);
- }
-
- /**
* Returns an implementation of {@link DoFnRunner} that handles late data dropping.
*
- * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+ * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
*/
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
deleted file mode 100644
index a160553..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.construction.Triggers;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-
-/**
- * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
- * implementation is applicable.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
- extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
-
- private final WindowingStrategy<?, W> strategy;
- private final StateInternalsFactory<K> stateInternalsFactory;
- private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
- public GroupAlsoByWindowViaOutputBufferDoFn(
- WindowingStrategy<?, W> windowingStrategy,
- StateInternalsFactory<K> stateInternalsFactory,
- SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- this.strategy = windowingStrategy;
- this.reduceFn = reduceFn;
- this.stateInternalsFactory = stateInternalsFactory;
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- K key = c.element().getKey();
- // Used with Batch, we know that all the data is available for this key. We can't use the
- // timer manager from the context because it doesn't exist. So we create one and emulate the
- // watermark, knowing that we have all data and it is in timestamp order.
- InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- timerInternals.advanceProcessingTime(Instant.now());
- timerInternals.advanceSynchronizedProcessingTime(Instant.now());
- StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-
- ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
- new ReduceFnRunner<>(
- key,
- strategy,
- ExecutableTriggerStateMachine.create(
- TriggerStateMachines.stateMachineForTrigger(
- Triggers.toProto(strategy.getTrigger()))),
- stateInternals,
- timerInternals,
- WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
- WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
- reduceFn,
- c.getPipelineOptions());
-
- // Process the elements.
- reduceFnRunner.processElements(c.element().getValue());
-
- // Finish any pending windows by advancing the input watermark to infinity.
- timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- // Finally, advance the processing time to infinity to fire any timers.
- timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
- timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- fireEligibleTimers(timerInternals, reduceFnRunner);
-
- reduceFnRunner.persist();
- }
-
- private void fireEligibleTimers(
- InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner)
- throws Exception {
- List<TimerInternals.TimerData> timers = new ArrayList<>();
- while (true) {
- TimerInternals.TimerData timer;
- while ((timer = timerInternals.removeNextEventTimer()) != null) {
- timers.add(timer);
- }
- while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
- timers.add(timer);
- }
- while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
- timers.add(timer);
- }
- if (timers.isEmpty()) {
- break;
- }
- reduceFnRunner.onTimers(timers);
- timers.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
deleted file mode 100644
index 2342c52..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.runners.core.construction.Triggers;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
- * {@link ReduceFnRunner}.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaWindowSetDoFn<
- K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
- extends OldDoFn<RinT, KV<K, OutputT>> {
-
- public static <K, InputT, OutputT, W extends BoundedWindow>
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
- WindowingStrategy<?, W> strategy,
- StateInternalsFactory<K> stateInternalsFactory,
- SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
- }
-
- private final WindowingStrategy<Object, W> windowingStrategy;
- private final StateInternalsFactory<K> stateInternalsFactory;
- private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
- private GroupAlsoByWindowViaWindowSetDoFn(
- WindowingStrategy<?, W> windowingStrategy,
- StateInternalsFactory<K> stateInternalsFactory,
- SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- @SuppressWarnings("unchecked")
- WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
- this.windowingStrategy = noWildcard;
- this.reduceFn = reduceFn;
- this.stateInternalsFactory = stateInternalsFactory;
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
-
- K key = keyedWorkItem.key();
- TimerInternals timerInternals = c.windowingInternals().timerInternals();
- StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-
- ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
- new ReduceFnRunner<>(
- key,
- windowingStrategy,
- ExecutableTriggerStateMachine.create(
- TriggerStateMachines.stateMachineForTrigger(
- Triggers.toProto(windowingStrategy.getTrigger()))),
- stateInternals,
- timerInternals,
- WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
- WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
- reduceFn,
- c.getPipelineOptions());
-
- reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
- reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
- reduceFnRunner.persist();
- }
-
- public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
- (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
- return asFn;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 5b82d1f..744d162 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
* {@link ReduceFnRunner}.
*/
@SystemDoFnInternal
@@ -134,8 +134,4 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}
-
- public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
- throw new RuntimeException("Not implement!");
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
new file mode 100644
index 0000000..8d96257
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+/**
+ * Common aggregator names for {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow}.
+ */
+public abstract class GroupAlsoByWindowsAggregators {
+ public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+ public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
deleted file mode 100644
index 2bd9ee0..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
- * combining values.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-@SystemDoFnInternal
-public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
- extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
- public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
- public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 570f524..1cf1509 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
@@ -33,7 +34,7 @@ import org.joda.time.Instant;
/**
* A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ * a {@link KeyedWorkItem} input {@link DoFn}.
*
* <p>It expands windows before checking data lateness.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
deleted file mode 100644
index 41bb598..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code OldDoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code OldDoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code OldDoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFn} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
- */
-@Deprecated
-public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
- /**
- * Information accessible to all methods in this {@code OldDoFn}.
- * Used primarily to output elements.
- */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link PipelineRunner}
- * invoking this {@code OldDoFn}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should be considered
- * immutable and not be modified in any way. It may be cached or retained
- * by a Beam runner or later steps in the pipeline, or used in
- * other unspecified ways.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to {@link OldDoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code output} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags}
- * to specify the tags of outputs that it consumes. Outputs that are not consumed, e.g., outputs
- * for monitoring purposes only, don't necessarily need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link OldDoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo.SingleOutput#withOutputTags
- */
- public abstract <T> void output(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified output {@code PCollection}, with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp must not be
- * older than the input element's timestamp minus {@link OldDoFn#getAllowedTimestampSkew
- * getAllowedTimestampSkew}. The output element will be in the same windows as the input
- * element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the
- * input {@code PCollection} to determine what windows the element should be in, throwing an
- * exception if the {@code WindowFn} attempts to access any information about the input element
- * except for the timestamp.
- *
- * @see ParDo.SingleOutput#withOutputTags
- */
- public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
- }
-
- /**
- * Information accessible when running {@link OldDoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element should be considered immutable. A Beam runner will not mutate the
- * element, so it is safe to cache, etc. The element should not be mutated by any of the
- * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
- * runtime, or used in other unspecified ways.
- */
- public abstract InputT element();
-
- /**
- * Returns the value of the side input for the window corresponding to the
- * window of the main input element.
- *
- * <p>See
- * {@link WindowMappingFn#getSideInputWindow}
- * for how this corresponding window is determined.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo.SingleOutput#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns the window into which the input element has been assigned.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- *
- * @throws UnsupportedOperationException if this {@link OldDoFn} does
- * not implement {@link RequiresWindowAccess}.
- */
- public abstract BoundedWindow window();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
-
- /**
- * Returns the process context to use for implementing windowing.
- */
- @Experimental
- public abstract WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link OldDoFn.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- *
- * <p>Note that producing an element whose timestamp is less than the
- * current timestamp may result in late data, i.e. returning a non-zero
- * value here does not impact watermark calculations used for firing
- * windows.
- *
- * @deprecated does not interact well with the watermark.
- */
- @Deprecated
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /**
- * Interface for signaling that a {@link OldDoFn} needs to access the window the
- * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
- */
- @Experimental
- public interface RequiresWindowAccess {}
-
- public OldDoFn() {
- }
-
- /**
- * Prepares this {@link DoFn} instance for processing bundles.
- *
- * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
- * {@link DoFn} method is called.
- *
- * <p>By default, does nothing.
- */
- public void setup() throws Exception {
- }
-
- /**
- * Prepares this {@code OldDoFn} instance for processing a batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void startBundle(Context c) throws Exception {
- }
-
- /**
- * Processes one input element.
- *
- * <p>The current element of the input {@code PCollection} is returned by
- * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
- * runner will not mutate the element, so it is safe to cache, etc. The element should not be
- * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
- * the Beam runner, or used in other unspecified ways.
- *
- * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
- * Once passed to {@code output} the element should be considered immutable and not be modified in
- * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
- * unspecified ways.
- *
- * @see ProcessContext
- */
- public abstract void processElement(ProcessContext c) throws Exception;
-
- /**
- * Finishes processing this batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void finishBundle(Context c) throws Exception {
- }
-
- /**
- * Cleans up this {@link DoFn}.
- *
- * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
- * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
- * {@link DoFn} methods will be called after a call to {@link #teardown()}.
- *
- * <p>By default, does nothing.
- */
- public void teardown() throws Exception {
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
deleted file mode 100644
index 2a0b688..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-/**
- * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the {@link OldDoFn} (main) input elements
- * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
- */
-class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
- /** The {@link OldDoFn} being run. */
- private final OldDoFn<InputT, OutputT> fn;
- /** The context used for running the {@link OldDoFn}. */
- private final DoFnContext<InputT, OutputT> context;
-
- public SimpleOldDoFnRunner(
- PipelineOptions options,
- OldDoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- StepContext stepContext,
- WindowingStrategy<?, ?> windowingStrategy) {
- this.fn = fn;
- this.context = new DoFnContext<>(
- options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- windowingStrategy == null ? null : windowingStrategy.getWindowFn());
- }
-
- @Override
- public void startBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.startBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- if (elem.getWindows().size() <= 1
- || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
- && context.sideInputReader.isEmpty())) {
- invokeProcessElement(elem);
- } else {
- // We could modify the windowed value (and the processContext) to
- // avoid repeated allocations, but this is more straightforward.
- for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
- invokeProcessElement(windowedValue);
- }
- }
- }
-
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
- TimeDomain timeDomain) {
- throw new UnsupportedOperationException(
- String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName()));
- }
-
- private void invokeProcessElement(WindowedValue<InputT> elem) {
- final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.processElement(processContext);
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
- }
- }
-
- @Override
- public void finishBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.finishBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- /**
- * Returns a new {@link OldDoFn.ProcessContext} for the given element.
- */
- private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
- WindowedValue<InputT> elem) {
- return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
- }
-
- private RuntimeException wrapUserCodeException(Throwable t) {
- throw UserCodeException.wrapIf(!isSystemDoFn(), t);
- }
-
- private boolean isSystemDoFn() {
- return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
- }
-
- /**
- * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
- *
- * @param <InputT> the type of the {@link OldDoFn} (main) input elements
- * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
- */
- private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
- private static final int MAX_SIDE_OUTPUTS = 1000;
-
- final PipelineOptions options;
- final OldDoFn<InputT, OutputT> fn;
- final SideInputReader sideInputReader;
- final OutputManager outputManager;
- final TupleTag<OutputT> mainOutputTag;
- final StepContext stepContext;
- final WindowFn<?, ?> windowFn;
-
- /**
- * The set of known output tags, some of which may be undeclared, so we can throw an
- * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
- */
- private Set<TupleTag<?>> outputTags;
-
- public DoFnContext(PipelineOptions options,
- OldDoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- StepContext stepContext,
- WindowFn<?, ?> windowFn) {
- fn.super();
- this.options = options;
- this.fn = fn;
- this.sideInputReader = sideInputReader;
- this.outputManager = outputManager;
- this.mainOutputTag = mainOutputTag;
- this.outputTags = Sets.newHashSet();
-
- outputTags.add(mainOutputTag);
- for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
- outputTags.add(additionalOutputTag);
- }
-
- this.stepContext = stepContext;
- this.windowFn = windowFn;
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
- T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
- final Instant inputTimestamp = timestamp;
-
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- if (windows == null) {
- try {
- // The windowFn can never succeed at accessing the element, so its type does not
- // matter here
- @SuppressWarnings("unchecked")
- WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
- windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
- @Override
- public Object element() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available");
- }
-
- @Override
- public Instant timestamp() {
- if (inputTimestamp == null) {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input timestamp when none was available");
- }
- return inputTimestamp;
- }
-
- @Override
- public W window() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input windows when none were available");
- }
- });
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- return WindowedValue.of(output, timestamp, windows, pane);
- }
-
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
- return sideInputReader.get(view, sideInputWindow);
- }
-
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
- }
-
- void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
- outputManager.output(mainOutputTag, windowedElem);
- if (stepContext != null) {
- stepContext.noteOutput(windowedElem);
- }
- }
-
- private <T> void outputWindowedValue(TupleTag<T> tag,
- T output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
- }
-
- private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
- if (!outputTags.contains(tag)) {
- // This tag wasn't declared nor was it seen before during this execution.
- // Thus, this must be a new, undeclared and unconsumed output.
- // To prevent likely user errors, enforce the limit on the number of side
- // outputs.
- if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
- throw new IllegalArgumentException(
- "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
- }
- outputTags.add(tag);
- }
-
- outputManager.output(tag, windowedElem);
- if (stepContext != null) {
- stepContext.noteOutput(tag, windowedElem);
- }
- }
-
- // Following implementations of output, outputWithTimestamp, and output
- // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
- // ProcessContext's versions in OldDoFn.processElement.
- @Override
- public void output(OutputT output) {
- outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- checkNotNull(tag, "TupleTag passed to output cannot be null");
- outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
- outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
- }
- }
-
- /**
- * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
- * over a single element.
- *
- * @param <InputT> the type of the {@link OldDoFn} (main) input elements
- * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
- */
- private static class DoFnProcessContext<InputT, OutputT>
- extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
- final OldDoFn<InputT, OutputT> fn;
- final DoFnContext<InputT, OutputT> context;
- final WindowedValue<InputT> windowedValue;
-
- public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
- DoFnContext<InputT, OutputT> context,
- WindowedValue<InputT> windowedValue) {
- fn.super();
- this.fn = fn;
- this.context = context;
- this.windowedValue = windowedValue;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public InputT element() {
- return windowedValue.getValue();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- checkNotNull(view, "View passed to sideInput cannot be null");
- Iterator<? extends BoundedWindow> windowIter = windows().iterator();
- BoundedWindow window;
- if (!windowIter.hasNext()) {
- if (context.windowFn instanceof GlobalWindows) {
- // TODO: Remove this once GroupByKeyOnly no longer outputs elements
- // without windows
- window = GlobalWindow.INSTANCE;
- } else {
- throw new IllegalStateException(
- "sideInput called when main input element is not in any windows");
- }
- } else {
- window = windowIter.next();
- if (windowIter.hasNext()) {
- throw new IllegalStateException(
- "sideInput called when main input element is in multiple windows");
- }
- }
- return context.sideInput(
- view, view.getWindowMappingFn().getSideInputWindow(window));
- }
-
- @Override
- public BoundedWindow window() {
- if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
- throw new UnsupportedOperationException(
- "window() is only available in the context of a OldDoFn marked as"
- + "RequiresWindowAccess.");
- }
- return Iterables.getOnlyElement(windows());
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public void output(OutputT output) {
- context.outputWindowedValue(windowedValue.withValue(output));
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- checkTimestamp(timestamp);
- context.outputWindowedValue(output, timestamp,
- windowedValue.getWindows(), windowedValue.getPane());
- }
-
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- checkNotNull(tag, "Tag passed to output cannot be null");
- context.outputWindowedValue(tag, windowedValue.withValue(output));
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
- checkTimestamp(timestamp);
- context.outputWindowedValue(
- tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
- }
-
- @Override
- public Instant timestamp() {
- return windowedValue.getTimestamp();
- }
-
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- private void checkTimestamp(Instant timestamp) {
- if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
- throw new IllegalArgumentException(String.format(
- "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
- + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
- + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
- timestamp, windowedValue.getTimestamp(),
- PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
- }
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return new WindowingInternals<InputT, OutputT>() {
- @Override
- public void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public <AdditionalOutputT> void outputWindowedValue(
- TupleTag<AdditionalOutputT> tag,
- AdditionalOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- context.outputWindowedValue(tag, output, timestamp, windows, pane);
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public TimerInternals timerInternals() {
- return context.stepContext.timerInternals();
- }
-
- @Override
- public StateInternals stateInternals() {
- return context.stepContext.stateInternals();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
- return context.sideInput(view, sideInputWindow);
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
deleted file mode 100644
index 4a58445..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link
- * OutputWindowedValue}.
- */
-public class WindowingInternalsAdapters {
- static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) {
- return new SideInputReader() {
- @Override
- public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
- return windowingInternals.sideInput(view, sideInputWindow);
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isEmpty() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue(
- final WindowingInternals<?, OutputT> windowingInternals) {
- return new OutputWindowedValue<OutputT>() {
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public <AdditionalOutputT> void outputWindowedValue(
- TupleTag<AdditionalOutputT> tag,
- AdditionalOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- windowingInternals.outputWindowedValue(tag, output, timestamp, windows, pane);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
deleted file mode 100644
index a265ead..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link GroupAlsoByWindowViaOutputBufferDoFn}.
- */
-@RunWith(JUnit4.class)
-public class GroupAlsoByWindowViaOutputBufferDoFnTest {
-
- private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
- implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
-
- private final Coder<InputT> inputCoder;
-
- public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
- this.inputCoder = inputCoder;
- }
-
- @Override
- public <W extends BoundedWindow>
- GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
- WindowingStrategy<?, W> windowingStrategy,
- StateInternalsFactory<K> stateInternalsFactory) {
- return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
- windowingStrategy,
- stateInternalsFactory,
- SystemReduceFn.<K, InputT, W>buffering(inputCoder));
- }
- }
-
- @Test
- public void testEmptyInputEmptyOutput() throws Exception {
- GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
- new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoFixedWindows() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoSlidingWindows() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
- GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsIntoSessions() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
- public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-}