You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/18 18:54:23 UTC

[1/2] beam git commit: Revert "Removes final minor usages of OldDoFn outside OldDoFn itself"

Repository: beam
Updated Branches:
  refs/heads/master 8f4bb0158 -> 4124cc687


Revert "Removes final minor usages of OldDoFn outside OldDoFn itself"

This reverts commit a3b5f968c1ae2e4f712bfcf200a03d8d193fd90c.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d357aea7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d357aea7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d357aea7

Branch: refs/heads/master
Commit: d357aea77a9e7051fba3b5aead3c0782d9be6274
Parents: 8f4bb01
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 18 11:37:36 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 11:37:36 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AssignWindowsDoFn.java    |  78 +++++
 .../apache/beam/runners/core/DoFnAdapters.java  | 328 +++++++++++++++++++
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |  17 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   7 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  11 +-
 .../core/GroupAlsoByWindowsAggregators.java     |  28 --
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  46 +++
 .../core/LateDataDroppingDoFnRunner.java        |   3 +-
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |  27 +-
 .../beam/runners/core/ReduceFnTester.java       |   3 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   9 +-
 .../spark/translation/SparkAssignWindowFn.java  |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   8 +-
 16 files changed, 495 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
new file mode 100644
index 0000000..bbf3574
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
+ * provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+@SystemDoFnInternal
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
+    implements RequiresWindowAccess {
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
+    this.fn =
+        checkNotNull(
+            fn,
+            "%s provided to %s cannot be null",
+            WindowFn.class.getSimpleName(),
+            AssignWindowsDoFn.class.getSimpleName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(final ProcessContext c) throws Exception {
+    Collection<W> windows =
+        ((WindowFn<T, W>) fn).assignWindows(
+            ((WindowFn<T, W>) fn).new AssignContext() {
+                @Override
+                public T element() {
+                  return c.element();
+                }
+
+                @Override
+                public Instant timestamp() {
+                  return c.timestamp();
+                }
+
+                @Override
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
+                }
+              });
+
+    c.windowingInternals()
+        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
new file mode 100644
index 0000000..66ad736
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AggregatorRetriever;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ *     DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+  /** Should not be instantiated. */
+  private DoFnAdapters() {}
+
+  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
+    if (signature.processElement().observesWindow()) {
+      return new WindowDoFnAdapter<>(fn);
+    } else {
+      return new SimpleDoFnAdapter<>(fn);
+    }
+  }
+
+  /**
+   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+   * OldDoFn}.
+   */
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+    private final DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(AggregatorRetriever.getDelegatingAggregators(fn));
+      this.fn = fn;
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      this.invoker.invokeSetup();
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      fn.prepareForProcessing();
+      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void teardown() throws Exception {
+      this.invoker.invokeTeardown();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+      invoker.invokeProcessElement(adapter);
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(fn);
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+  }
+
+  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+      implements OldDoFn.RequiresWindowAccess {
+
+    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn);
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
+   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+   * unavailable.
+   */
+  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.Context context;
+
+    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+      fn.super();
+      this.context = context;
+      super.setupDelegateAggregators();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      context.output(tag, output);
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.outputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get the window in processElement; elsewhere there is no defined window.");
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Can only get a ProcessContext in processElement");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
+   */
+  private static class ProcessContextAdapter<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+    private ProcessContextAdapter(
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return context.sideInput(view);
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      context.output(tag, output);
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.outputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public InputT element() {
+      return context.element();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return context.timestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return context.pane();
+    }
+
+    @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return context.window();
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 06db6e1..b09ee08 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -105,7 +105,7 @@ public class DoFnRunners {
   /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
-   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 5bd7e2d..5508b2e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -17,34 +17,23 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER;
-import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER;
-
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
 /**
- * The default batch {@link GroupAlsoByWindowsAggregators} implementation, if no specialized "fast
- * path" implementation is applicable.
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
+ * implementation is applicable.
  */
 @SystemDoFnInternal
 public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
 
   private final WindowingStrategy<?, W> strategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index e6be93a..bf48df1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
  * {@link ReduceFnRunner}.
  */
 @SystemDoFnInternal
@@ -46,10 +46,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index e146bfc..0cf6e2d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
- * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
  * {@link ReduceFnRunner}.
  */
 @SystemDoFnInternal
@@ -61,10 +61,9 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
   private final WindowingStrategy<Object, W> windowingStrategy;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
   private transient StateInternalsFactory<K> stateInternalsFactory;
@@ -145,6 +144,10 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
     reduceFnRunner.persist();
   }
 
+  public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+    throw new RuntimeException("Not implement!");
+  }
+
   public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
     return droppedDueToLateness;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
deleted file mode 100644
index 7c4f252..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-
-/**
- * Standard aggregator names related to {@link GroupAlsoByWindow}.
- */
-public abstract class GroupAlsoByWindowsAggregators {
-  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
-  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
new file mode 100644
index 0000000..7e96136
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
+ * combining values.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+@SystemDoFnInternal
+public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
+    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index cdc7ce7..4d41527 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -33,7 +32,7 @@ import org.joda.time.Instant;
 
 /**
  * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link DoFn}.
+ * a {@link KeyedWorkItem} input {@link OldDoFn}.
  *
  * <p>It expands windows before checking data lateness.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
index e725cd2..cb8d494 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
@@ -43,10 +43,10 @@ public class GroupAlsoByWindowViaOutputBufferDoFnTest {
 
     @Override
     public <W extends BoundedWindow>
-        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+    GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
             WindowingStrategy<?, W> windowingStrategy,
             StateInternalsFactory<K> stateInternalsFactory) {
-      return new GroupAlsoByWindowViaOutputBufferDoFn<>(
+      return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
           windowingStrategy,
           stateInternalsFactory,
           SystemReduceFn.<K, InputT, W>buffering(inputCoder));

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index a5031b8..d0a8923 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -57,7 +57,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
- * Properties of {@link GroupAlsoByWindowsAggregators}.
+ * Properties of {@link GroupAlsoByWindowsDoFn}.
  *
  * <p>Some properties may not hold of some implementations, due to restrictions on the context in
  * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
@@ -66,13 +66,12 @@ import org.joda.time.Instant;
 public class GroupAlsoByWindowsProperties {
 
   /**
-   * A factory of {@link GroupAlsoByWindowsAggregators} so that the various properties can provide
-   * the appropriate windowing strategy under test.
+   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
+   * appropriate windowing strategy under test.
    */
   public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow>
-        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> forStrategy(
-            WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
+    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
+        WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
   }
 
   /**
@@ -312,7 +311,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
+   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
    * session window correctly according to the provided {@link CombineFn}.
    */
   public static void combinesElementsPerSession(
@@ -499,7 +498,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
+   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
    * session window correctly according to the provided {@link CombineFn}.
    */
   public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
@@ -598,7 +597,7 @@ public class GroupAlsoByWindowsProperties {
 
   private static <K, InputT, OutputT, W extends BoundedWindow>
       List<WindowedValue<KV<K, OutputT>>> processElement(
-          GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
+          GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
           KV<K, Iterable<WindowedValue<InputT>>> element)
           throws Exception {
     TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
@@ -622,18 +621,18 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * A {@link GroupAlsoByWindowViaOutputBufferDoFn.ProcessContext} providing just enough context for
-   * a {@link GroupAlsoByWindowsAggregators} - namely, information about the element and output via
-   * {@link WindowingInternals}, but no side inputs/outputs and no normal output.
+   * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
+   * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
+   * WindowingInternals}, but no side inputs/outputs and no normal output.
    */
   private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
-      extends GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W>.ProcessContext {
+      extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
     private final PipelineOptions options = PipelineOptionsFactory.create();
     private final KV<K, Iterable<WindowedValue<InputT>>> element;
     private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
 
     private TestProcessContext(
-        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
+        GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
         KV<K, Iterable<WindowedValue<InputT>>> element) {
       fn.super();
       this.element = element;

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 923b2c3..914550e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -113,8 +113,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   private boolean autoAdvanceOutputWatermark = true;
 
   private final InMemoryLongSumAggregator droppedDueToClosedWindow =
-      new InMemoryLongSumAggregator(
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+      new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
 
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index ce29709..ce7b12a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
@@ -146,10 +146,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
           Sum.ofLongs());
       droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
           Sum.ofLongs());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1b40613..029c28a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -23,8 +23,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.LateDataUtils;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -77,7 +76,7 @@ import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 
 /**
- * An implementation of {@link GroupAlsoByWindow}
+ * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn}
  * logic for grouping by windows and controlling trigger firings and pane accumulation.
  *
  * <p>This implementation is a composite of Spark transformations revolving around state management
@@ -209,9 +208,9 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         // use in memory Aggregators since Spark Accumulators are not resilient
         // in stateful operators, once done with this partition.
         final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
         final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
+            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
 
         AbstractIterator<
             Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 088b981..18a3dd8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -29,8 +29,7 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} for the Spark
- * runner.
+ * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner.
  */
 public class SparkAssignWindowFn<T, W extends BoundedWindow>
     implements Function<WindowedValue<T>, WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 85adca9..ccc0fa3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -48,7 +48,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.joda.time.Instant;
 
 /**
- * An implementation of {@link GroupAlsoByWindow}
+ * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
  * for the Spark runner.
  */
 public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
@@ -75,7 +75,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
 
     droppedDueToClosedWindow = runtimeContext.createAggregator(
         accumulator,
-        GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+        GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
         Sum.ofLongs());
   }
 


[2/2] beam git commit: This closes #2580

Posted by jk...@apache.org.
This closes #2580


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4124cc68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4124cc68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4124cc68

Branch: refs/heads/master
Commit: 4124cc6878d3b28a6a6367ef8cf854f76c85d9ef
Parents: 8f4bb01 d357aea
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 18 11:53:56 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 11:53:56 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AssignWindowsDoFn.java    |  78 +++++
 .../apache/beam/runners/core/DoFnAdapters.java  | 328 +++++++++++++++++++
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |  17 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   7 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  11 +-
 .../core/GroupAlsoByWindowsAggregators.java     |  28 --
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  46 +++
 .../core/LateDataDroppingDoFnRunner.java        |   3 +-
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |  27 +-
 .../beam/runners/core/ReduceFnTester.java       |   3 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   9 +-
 .../spark/translation/SparkAssignWindowFn.java  |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   8 +-
 16 files changed, 495 insertions(+), 85 deletions(-)
----------------------------------------------------------------------