You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/15 07:07:14 UTC

[1/3] beam git commit: [BEAM-1915] Removes use of OldDoFn from Apex

Repository: beam
Updated Branches:
  refs/heads/master fdbadfc9c -> f7d727c0f


[BEAM-1915] Removes use of OldDoFn from Apex

This is the last occurrence of OldDoFn in the Beam repository
outside of OldDoFn itself.

It's also used in the Dataflow worker, but it can be
deleted entirely once we (Dataflow team) take care of that.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e243881
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e243881
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e243881

Branch: refs/heads/master
Commit: 3e243881fe767cf30869abf5c745c26f96d66fc4
Parents: fdbadfc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 10 22:51:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 14 23:34:11 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 225 ++++++-------------
 1 file changed, 63 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e243881/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1697921..7d17ac6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -42,32 +42,29 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.WindowingInternals;
+import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -98,8 +95,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private final StateInternalsFactory<K> stateInternalsFactory;
   private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
 
-  private transient ProcessContext context;
-  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
   private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
@@ -161,16 +156,53 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   @Override
   public void setup(OperatorContext context) {
     this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
-    StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
-    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
-        stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
-    this.context = new ProcessContext(fn, this.timerInternals);
   }
 
   @Override
   public void teardown() {
   }
 
+
+  private ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner(K key) {
+    return new ReduceFnRunner<>(
+        key,
+        windowingStrategy,
+        ExecutableTriggerStateMachine.create(
+            TriggerStateMachines.stateMachineForTrigger(
+                Triggers.toProto(windowingStrategy.getTrigger()))),
+        stateInternalsFactory.stateInternalsForKey(key),
+        timerInternals,
+        new OutputWindowedValue<KV<K, Iterable<V>>>() {
+          @Override
+          public void outputWindowedValue(
+              KV<K, Iterable<V>> output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            if (traceTuples) {
+              LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+            }
+            ApexGroupByKeyOperator.this.output.emit(
+                ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+          }
+
+          @Override
+          public <AdditionalOutputT> void outputWindowedValue(
+              TupleTag<AdditionalOutputT> tag,
+              AdditionalOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            throw new UnsupportedOperationException(
+                "GroupAlsoByWindow should not use side outputs");
+          }
+        },
+        NullSideInputReader.empty(),
+        null,
+        SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder),
+        serializedOptions.get());
+  }
+
   /**
    * Returns the list of timers that are ready to fire. These are the timers
    * that are registered to be triggered at a time before the current watermark.
@@ -212,13 +244,11 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         windowedValue.getTimestamp(),
         windowedValue.getWindows(),
         windowedValue.getPane());
-
-    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
-            kv.getKey(),
-            Collections.singletonList(updatedWindowedValue));
-
-    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-    fn.processElement(context);
+    timerInternals.setKey(kv.getKey());
+    ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
+        newReduceFnRunner(kv.getKey());
+    reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
+    reduceFnRunner.persist();
   }
 
   private StateInternals<K> getStateInternalsForKey(K key) {
@@ -265,158 +295,29 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     if (!timers.isEmpty()) {
       for (Slice keyBytes : timers.keySet()) {
         K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
-        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
-        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-        fn.processElement(context);
+        timerInternals.setKey(key);
+        ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
+        reduceFnRunner.onTimers(timers.get(keyBytes));
+        reduceFnRunner.persist();
       }
     }
   }
 
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
-      KeyedWorkItem<K, V>>.ProcessContext {
-
-    private final ApexTimerInternals timerInternals;
-    private StateInternals<K> stateInternals;
-    private KeyedWorkItem<K, V> element;
-
-    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
-                          ApexTimerInternals timerInternals) {
-      function.super();
-      this.timerInternals = checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
-      this.element = element;
-      this.stateInternals = stateForKey;
-    }
-
-    @Override
-    public KeyedWorkItem<K, V> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException(
-          "timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.get();
-    }
-
-    @Override
-    public void output(KV<K, Iterable<V>> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException(
-          "pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
-
-        @Override
-        public StateInternals<K> stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(
-            KV<K, Iterable<V>> output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          if (traceTuples) {
-            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
-          }
-          ApexGroupByKeyOperator.this.output.emit(
-              ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
-        }
-
-        @Override
-        public <AdditionalOutputT> void outputWindowedValue(
-            TupleTag<AdditionalOutputT> tag,
-            AdditionalOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "GroupAlsoByWindow should not use tagged outputs");
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      throw new RuntimeException("output() is not available when grouping by window.");
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      output(tag, output);
-    }
-
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   /**
    * An implementation of Beam's {@link TimerInternals}.
    *
    */
   private class ApexTimerInternals implements TimerInternals {
+    private K key;
+
+    public void setKey(K key) {
+      this.key = key;
+    }
 
     @Deprecated
     @Override
     public void setTimer(TimerData timerData) {
-      registerActiveTimer(context.element().key(), timerData);
+      registerActiveTimer(key, timerData);
     }
 
     @Override
@@ -427,7 +328,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     @Deprecated
     @Override
     public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
+      unregisterActiveTimer(key, timerKey);
     }
 
     @Override


[2/3] beam git commit: Removes final minor usages of OldDoFn outside OldDoFn itself

Posted by jk...@apache.org.
Removes final minor usages of OldDoFn outside OldDoFn itself


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a3b5f968
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a3b5f968
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a3b5f968

Branch: refs/heads/master
Commit: a3b5f968c1ae2e4f712bfcf200a03d8d193fd90c
Parents: 3e24388
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 11 15:06:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 14 23:34:49 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AssignWindowsDoFn.java    |  78 -----
 .../apache/beam/runners/core/DoFnAdapters.java  | 328 -------------------
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |  17 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   7 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  11 +-
 .../core/GroupAlsoByWindowsAggregators.java     |  28 ++
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  46 ---
 .../core/LateDataDroppingDoFnRunner.java        |   3 +-
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |  27 +-
 .../beam/runners/core/ReduceFnTester.java       |   3 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   9 +-
 .../spark/translation/SparkAssignWindowFn.java  |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   8 +-
 16 files changed, 85 insertions(+), 495 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
deleted file mode 100644
index bbf3574..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
- * provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
-    implements RequiresWindowAccess {
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
-    this.fn =
-        checkNotNull(
-            fn,
-            "%s provided to %s cannot be null",
-            WindowFn.class.getSimpleName(),
-            AssignWindowsDoFn.class.getSimpleName());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void processElement(final ProcessContext c) throws Exception {
-    Collection<W> windows =
-        ((WindowFn<T, W>) fn).assignWindows(
-            ((WindowFn<T, W>) fn).new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public BoundedWindow window() {
-                  return Iterables.getOnlyElement(c.windowingInternals().windows());
-                }
-              });
-
-    c.windowingInternals()
-        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index 66ad736..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- *     DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
-  /** Should not be instantiated. */
-  private DoFnAdapters() {}
-
-  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
-    if (signature.processElement().observesWindow()) {
-      return new WindowDoFnAdapter<>(fn);
-    } else {
-      return new SimpleDoFnAdapter<>(fn);
-    }
-  }
-
-  /**
-   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
-   * OldDoFn}.
-   */
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(AggregatorRetriever.getDelegatingAggregators(fn));
-      this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      this.invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      fn.prepareForProcessing();
-      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void teardown() throws Exception {
-      this.invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter);
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-  }
-
-  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
-  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
-      implements OldDoFn.RequiresWindowAccess {
-
-    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
-   * unavailable.
-   */
-  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      context.output(tag, output);
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.outputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no defined window.");
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public RestrictionTracker<?> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
-   */
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      context.output(tag, output);
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.outputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public void updateWatermark(Instant watermark) {
-      throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public RestrictionTracker<?> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..06db6e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -105,7 +105,7 @@ public class DoFnRunners {
   /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
-   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 5508b2e..5bd7e2d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -17,23 +17,34 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER;
+import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER;
+
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
 /**
- * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
- * implementation is applicable.
+ * The default batch {@link GroupAlsoByWindowsAggregators} implementation, if no specialized "fast
+ * path" implementation is applicable.
  */
 @SystemDoFnInternal
 public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 
   private final WindowingStrategy<?, W> strategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index bf48df1..e6be93a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
  * {@link ReduceFnRunner}.
  */
 @SystemDoFnInternal
@@ -46,9 +46,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+      createAggregator(
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 0cf6e2d..e146bfc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
  * {@link ReduceFnRunner}.
  */
 @SystemDoFnInternal
@@ -61,9 +61,10 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+      createAggregator(
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
   private final WindowingStrategy<Object, W> windowingStrategy;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
   private transient StateInternalsFactory<K> stateInternalsFactory;
@@ -144,10 +145,6 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
     reduceFnRunner.persist();
   }
 
-  public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
-    throw new RuntimeException("Not implement!");
-  }
-
   public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
     return droppedDueToLateness;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
new file mode 100644
index 0000000..7c4f252
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+
+/**
+ * Standard aggregator names related to {@link GroupAlsoByWindow}.
+ */
+public abstract class GroupAlsoByWindowsAggregators {
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
deleted file mode 100644
index 7e96136..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
- * combining values.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-@SystemDoFnInternal
-public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
-  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
-  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 4d41527..cdc7ce7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -32,7 +33,7 @@ import org.joda.time.Instant;
 
 /**
  * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ * a {@link KeyedWorkItem} input {@link DoFn}.
  *
  * <p>It expands windows before checking data lateness.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
index cb8d494..e725cd2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
@@ -43,10 +43,10 @@ public class GroupAlsoByWindowViaOutputBufferDoFnTest {
 
     @Override
     public <W extends BoundedWindow>
-    GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
             WindowingStrategy<?, W> windowingStrategy,
             StateInternalsFactory<K> stateInternalsFactory) {
-      return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
+      return new GroupAlsoByWindowViaOutputBufferDoFn<>(
           windowingStrategy,
           stateInternalsFactory,
           SystemReduceFn.<K, InputT, W>buffering(inputCoder));

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d0a8923..a5031b8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -57,7 +57,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
- * Properties of {@link GroupAlsoByWindowsDoFn}.
+ * Properties of {@link GroupAlsoByWindowsAggregators}.
  *
  * <p>Some properties may not hold of some implementations, due to restrictions on the context in
  * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
@@ -66,12 +66,13 @@ import org.joda.time.Instant;
 public class GroupAlsoByWindowsProperties {
 
   /**
-   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
-   * appropriate windowing strategy under test.
+   * A factory of {@link GroupAlsoByWindowsAggregators} so that the various properties can provide
+   * the appropriate windowing strategy under test.
    */
   public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
-        WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
+    <W extends BoundedWindow>
+        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> forStrategy(
+            WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
   }
 
   /**
@@ -311,7 +312,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+   * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
    * session window correctly according to the provided {@link CombineFn}.
    */
   public static void combinesElementsPerSession(
@@ -498,7 +499,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+   * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per
    * session window correctly according to the provided {@link CombineFn}.
    */
   public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
@@ -597,7 +598,7 @@ public class GroupAlsoByWindowsProperties {
 
   private static <K, InputT, OutputT, W extends BoundedWindow>
       List<WindowedValue<KV<K, OutputT>>> processElement(
-          GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+          GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
           KV<K, Iterable<WindowedValue<InputT>>> element)
           throws Exception {
     TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
@@ -621,18 +622,18 @@ public class GroupAlsoByWindowsProperties {
   }
 
   /**
-   * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
-   * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
-   * WindowingInternals}, but no side inputs/outputs and no normal output.
+   * A {@link GroupAlsoByWindowViaOutputBufferDoFn.ProcessContext} providing just enough context for
+   * a {@link GroupAlsoByWindowsAggregators} - namely, information about the element and output via
+   * {@link WindowingInternals}, but no side inputs/outputs and no normal output.
    */
   private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
-      extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
+      extends GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W>.ProcessContext {
     private final PipelineOptions options = PipelineOptionsFactory.create();
     private final KV<K, Iterable<WindowedValue<InputT>>> element;
     private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
 
     private TestProcessContext(
-        GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+        GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn,
         KV<K, Iterable<WindowedValue<InputT>>> element) {
       fn.super();
       this.element = element;

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 914550e..923b2c3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -113,7 +113,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   private boolean autoAdvanceOutputWatermark = true;
 
   private final InMemoryLongSumAggregator droppedDueToClosedWindow =
-      new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+      new InMemoryLongSumAggregator(
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
 
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index ce7b12a..ce29709 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
@@ -146,10 +146,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
           Sum.ofLongs());
       droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
+          GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER,
           Sum.ofLongs());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 029c28a..1b40613 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -23,7 +23,8 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.LateDataUtils;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -76,7 +77,7 @@ import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
  * logic for grouping by windows and controlling trigger firings and pane accumulation.
  *
  * <p>This implementation is a composite of Spark transformations revolving around state management
@@ -208,9 +209,9 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         // use in memory Aggregators since Spark Accumulators are not resilient
         // in stateful operators, once done with this partition.
         final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+            GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
         final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+            GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
 
         AbstractIterator<
             Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 18a3dd8..088b981 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -29,7 +29,8 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner.
+ * An implementation of {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} for the Spark
+ * runner.
  */
 public class SparkAssignWindowFn<T, W extends BoundedWindow>
     implements Function<WindowedValue<T>, WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index ccc0fa3..85adca9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -48,7 +48,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.joda.time.Instant;
 
 /**
- * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
+ * An implementation of {@link GroupAlsoByWindow}
  * for the Spark runner.
  */
 public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
@@ -75,7 +75,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
 
     droppedDueToClosedWindow = runtimeContext.createAggregator(
         accumulator,
-        GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+        GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
         Sum.ofLongs());
   }
 


[3/3] beam git commit: This closes #2500

Posted by jk...@apache.org.
This closes #2500


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7d727c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7d727c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7d727c0

Branch: refs/heads/master
Commit: f7d727c0f7ada7a162f5cb73f658ce52f094dd86
Parents: fdbadfc a3b5f96
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 14 23:54:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 14 23:54:59 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 225 ++++---------
 .../beam/runners/core/AssignWindowsDoFn.java    |  78 -----
 .../apache/beam/runners/core/DoFnAdapters.java  | 328 -------------------
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |  17 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   7 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  11 +-
 .../core/GroupAlsoByWindowsAggregators.java     |  28 ++
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  46 ---
 .../core/LateDataDroppingDoFnRunner.java        |   3 +-
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |  27 +-
 .../beam/runners/core/ReduceFnTester.java       |   3 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   9 +-
 .../spark/translation/SparkAssignWindowFn.java  |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   8 +-
 17 files changed, 148 insertions(+), 657 deletions(-)
----------------------------------------------------------------------