You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:08 UTC
[13/50] [abbrv] beam git commit: Removes final minor usages of
OldDoFn outside OldDoFn itself
Removes final minor usages of OldDoFn outside OldDoFn itself
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a3b5f968
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a3b5f968
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a3b5f968
Branch: refs/heads/jstorm-runner
Commit: a3b5f968c1ae2e4f712bfcf200a03d8d193fd90c
Parents: 3e24388
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 11 15:06:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 14 23:34:49 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/AssignWindowsDoFn.java | 78 -----
.../apache/beam/runners/core/DoFnAdapters.java | 328 -------------------
.../apache/beam/runners/core/DoFnRunners.java | 2 +-
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 17 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 7 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 11 +-
.../core/GroupAlsoByWindowsAggregators.java | 28 ++
.../runners/core/GroupAlsoByWindowsDoFn.java | 46 ---
.../core/LateDataDroppingDoFnRunner.java | 3 +-
...roupAlsoByWindowViaOutputBufferDoFnTest.java | 4 +-
.../core/GroupAlsoByWindowsProperties.java | 27 +-
.../beam/runners/core/ReduceFnTester.java | 3 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 9 +-
.../spark/translation/SparkAssignWindowFn.java | 3 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 8 +-
16 files changed, 85 insertions(+), 495 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
deleted file mode 100644
index bbf3574..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
- * provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
- implements RequiresWindowAccess {
- private WindowFn<? super T, W> fn;
-
- public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
- this.fn =
- checkNotNull(
- fn,
- "%s provided to %s cannot be null",
- WindowFn.class.getSimpleName(),
- AssignWindowsDoFn.class.getSimpleName());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(final ProcessContext c) throws Exception {
- Collection<W> windows =
- ((WindowFn<T, W>) fn).assignWindows(
- ((WindowFn<T, W>) fn).new AssignContext() {
- @Override
- public T element() {
- return c.element();
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(c.windowingInternals().windows());
- }
- });
-
- c.windowingInternals()
- .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index 66ad736..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- * DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
- /** Should not be instantiated. */
- private DoFnAdapters() {}
-
- /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
- DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
- if (signature.processElement().observesWindow()) {
- return new WindowDoFnAdapter<>(fn);
- } else {
- return new SimpleDoFnAdapter<>(fn);
- }
- }
-
- /**
- * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
- * OldDoFn}.
- */
- private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
- private final DoFn<InputT, OutputT> fn;
- private transient DoFnInvoker<InputT, OutputT> invoker;
-
- SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
- super(AggregatorRetriever.getDelegatingAggregators(fn));
- this.fn = fn;
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
-
- @Override
- public void setup() throws Exception {
- this.invoker.invokeSetup();
- }
-
- @Override
- public void startBundle(Context c) throws Exception {
- fn.prepareForProcessing();
- invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
- }
-
- @Override
- public void teardown() throws Exception {
- this.invoker.invokeTeardown();
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- invoker.invokeProcessElement(adapter);
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return fn.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(fn);
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
- }
-
- /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
- private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
- implements OldDoFn.RequiresWindowAccess {
-
- WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
- super(fn);
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
- * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
- * unavailable.
- */
- private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.Context context;
-
- private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
- fn.super();
- this.context = context;
- super.setupDelegateAggregators();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- context.output(tag, output);
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.outputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
- String name,
- CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
-
- @Override
- public BoundedWindow window() {
- // The OldDoFn doesn't allow us to ask for these outside processElement, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the window in processElement; elsewhere there is no defined window.");
- }
-
- @Override
- public Context context(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a ProcessContext in processElement");
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Timers are not supported for OldDoFn");
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
- */
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- context.output(tag, output);
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.outputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public void updateWatermark(Instant watermark) {
- throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public Context context(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..06db6e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -105,7 +105,7 @@ public class DoFnRunners {
/**
* Returns an implementation of {@link DoFnRunner} that handles late data dropping.
*
- * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+ * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
*/
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 5508b2e..5bd7e2d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -17,23 +17,34 @@
*/
package org.apache.beam.runners.core;
+import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER;
+import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER;
+
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
/**
- * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
- * implementation is applicable.
+ * The default batch {@link GroupAlsoByWindowsAggregators} implementation, if no specialized "fast
+ * path" implementation is applicable.
*/
@SystemDoFnInternal
public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
- extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+ extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+ protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+ createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+ protected final Aggregator<Long, Long> droppedDueToLateness =
+ createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
private final WindowingStrategy<?, W> strategy;
private final StateInternalsFactory<K> stateInternalsFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index bf48df1..e6be93a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
* {@link ReduceFnRunner}.
*/
@SystemDoFnInternal
@@ -46,9 +46,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+ createAggregator(
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 0cf6e2d..e146bfc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
* {@link ReduceFnRunner}.
*/
@SystemDoFnInternal
@@ -61,9 +61,10 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+ createAggregator(
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
private final WindowingStrategy<Object, W> windowingStrategy;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
private transient StateInternalsFactory<K> stateInternalsFactory;
@@ -144,10 +145,6 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
reduceFnRunner.persist();
}
- public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
- throw new RuntimeException("Not implement!");
- }
-
public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
return droppedDueToLateness;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
new file mode 100644
index 0000000..7c4f252
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+
+/**
+ * Standard aggregator names related to {@link GroupAlsoByWindow}.
+ */
+public abstract class GroupAlsoByWindowsAggregators {
+ public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+ public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
deleted file mode 100644
index 7e96136..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
- * combining values.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-@SystemDoFnInternal
-public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
- extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
- public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
- public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 4d41527..cdc7ce7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowTracing;
@@ -32,7 +33,7 @@ import org.joda.time.Instant;
/**
* A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ * a {@link KeyedWorkItem} input {@link DoFn}.
*
* <p>It expands windows before checking data lateness.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
index cb8d494..e725cd2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
@@ -43,10 +43,10 @@ public class GroupAlsoByWindowViaOutputBufferDoFnTest {
@Override
public <W extends BoundedWindow>
- GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+ GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
WindowingStrategy<?, W> windowingStrategy,
StateInternalsFactory<K> stateInternalsFactory) {
- return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
+ return new GroupAlsoByWindowViaOutputBufferDoFn<>(
windowingStrategy,
stateInternalsFactory,
SystemReduceFn.<K, InputT, W>buffering(inputCoder));
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d0a8923..a5031b8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -57,7 +57,7 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
/**
- * Properties of {@link GroupAlsoByWindowsDoFn}.
+ * Properties of {@link GroupAlsoByWindowsAggregators}.
*
* <p>Some properties may not hold of some implementations, due to restrictions on the context in
* which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
@@ -66,12 +66,13 @@ import org.joda.time.Instant;
public class GroupAlsoByWindowsProperties {
/**
- * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
- * appropriate windowing strategy under test.
+ * A factory of {@link GroupAlsoByWindowsAggregators} so that the various properties can provide
+ * the appropriate windowing strategy under test.
*/
public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
- <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
- WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
+ <W extends BoundedWindow>
+ GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> forStrategy(
+ WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
}
/**
@@ -311,7 +312,7 @@ public class GroupAlsoByWindowsProperties {
}
/**
- * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+ * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
* session window correctly according to the provided {@link CombineFn}.
*/
public static void combinesElementsPerSession(
@@ -498,7 +499,7 @@ public class GroupAlsoByWindowsProperties {
}
/**
- * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+ * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
* session window correctly according to the provided {@link CombineFn}.
*/
public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
@@ -597,7 +598,7 @@ public class GroupAlsoByWindowsProperties {
private static <K, InputT, OutputT, W extends BoundedWindow>
List<WindowedValue<KV<K, OutputT>>> processElement(
- GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+ GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
KV<K, Iterable<WindowedValue<InputT>>> element)
throws Exception {
TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
@@ -621,18 +622,18 @@ public class GroupAlsoByWindowsProperties {
}
/**
- * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
- * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
- * WindowingInternals}, but no side inputs/outputs and no normal output.
+ * A {@link GroupAlsoByWindowViaOutputBufferDoFn.ProcessContext} providing just enough context for
+ * a {@link GroupAlsoByWindowsAggregators} - namely, information about the element and output via
+ * {@link WindowingInternals}, but no side inputs/outputs and no normal output.
*/
private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
- extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
+ extends GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W>.ProcessContext {
private final PipelineOptions options = PipelineOptionsFactory.create();
private final KV<K, Iterable<WindowedValue<InputT>>> element;
private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
private TestProcessContext(
- GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+ GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
KV<K, Iterable<WindowedValue<InputT>>> element) {
fn.super();
this.element = element;
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 914550e..923b2c3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -113,7 +113,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private boolean autoAdvanceOutputWatermark = true;
private final InMemoryLongSumAggregator droppedDueToClosedWindow =
- new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+ new InMemoryLongSumAggregator(
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index ce7b12a..ce29709 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
@@ -146,10 +146,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
reduceFn = SystemReduceFn.buffering(valueCoder);
droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
Sum.ofLongs());
droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER,
Sum.ofLongs());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 029c28a..1b40613 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -23,7 +23,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
@@ -76,7 +77,7 @@ import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;
/**
- * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
* logic for grouping by windows and controlling trigger firings and pane accumulation.
*
* <p>This implementation is a composite of Spark transformations revolving around state management
@@ -208,9 +209,9 @@ public class SparkGroupAlsoByWindowViaWindowSet {
// use in memory Aggregators since Spark Accumulators are not resilient
// in stateful operators, once done with this partition.
final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
AbstractIterator<
Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 18a3dd8..088b981 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -29,7 +29,8 @@ import org.joda.time.Instant;
/**
- * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner.
+ * An implementation of {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} for the Spark
+ * runner.
*/
public class SparkAssignWindowFn<T, W extends BoundedWindow>
implements Function<WindowedValue<T>, WindowedValue<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index ccc0fa3..85adca9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
@@ -48,7 +48,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.joda.time.Instant;
/**
- * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
* for the Spark runner.
*/
public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
@@ -75,7 +75,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
droppedDueToClosedWindow = runtimeContext.createAggregator(
accumulator,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+ GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
Sum.ofLongs());
}