You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:23 UTC

[40/50] [abbrv] beam git commit: Removes OldDoFn and its kin from runners-core

Removes OldDoFn and its kin from runners-core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6127f532
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6127f532
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6127f532

Branch: refs/heads/jstorm-runner
Commit: 6127f532b7e6ad0f13926d3d9aec17eb538108ed
Parents: 00b4a30
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri May 12 11:18:57 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 16 11:55:43 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AssignWindowsDoFn.java    |  78 --
 .../apache/beam/runners/core/DoFnAdapters.java  | 381 ----------
 .../apache/beam/runners/core/DoFnRunners.java   |  27 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   | 113 ---
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  94 ---
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   6 +-
 .../core/GroupAlsoByWindowsAggregators.java     |  26 +
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  39 -
 .../core/LateDataDroppingDoFnRunner.java        |   3 +-
 .../org/apache/beam/runners/core/OldDoFn.java   | 335 ---------
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 499 -------------
 .../core/WindowingInternalsAdapters.java        |  74 --
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 109 ---
 .../core/GroupAlsoByWindowsProperties.java      | 744 -------------------
 .../apache/beam/runners/core/NoOpOldDoFn.java   |  65 --
 .../apache/beam/runners/core/OldDoFnTest.java   |  51 --
 .../runners/core/SimpleOldDoFnRunnerTest.java   |  86 ---
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   9 +-
 .../spark/translation/SparkAssignWindowFn.java  |   4 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   4 +-
 21 files changed, 43 insertions(+), 2710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
deleted file mode 100644
index bbf3574..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
- * provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
-    implements RequiresWindowAccess {
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
-    this.fn =
-        checkNotNull(
-            fn,
-            "%s provided to %s cannot be null",
-            WindowFn.class.getSimpleName(),
-            AssignWindowsDoFn.class.getSimpleName());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void processElement(final ProcessContext c) throws Exception {
-    Collection<W> windows =
-        ((WindowFn<T, W>) fn).assignWindows(
-            ((WindowFn<T, W>) fn).new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public BoundedWindow window() {
-                  return Iterables.getOnlyElement(c.windowingInternals().windows());
-                }
-              });
-
-    c.windowingInternals()
-        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index af59a40..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.OldDoFn.Context;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- *     DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
-  /** Should not be instantiated. */
-  private DoFnAdapters() {}
-
-  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
-    if (signature.processElement().observesWindow()) {
-      return new WindowDoFnAdapter<>(fn);
-    } else {
-      return new SimpleDoFnAdapter<>(fn);
-    }
-  }
-
-  /**
-   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
-   * OldDoFn}.
-   */
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      this.invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      fn.prepareForProcessing();
-      invoker.invokeStartBundle(new StartBundleContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      invoker.invokeFinishBundle(new FinishBundleContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void teardown() throws Exception {
-      this.invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter);
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-  }
-
-  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
-  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
-      implements OldDoFn.RequiresWindowAccess {
-
-    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.StartBundle} method, which means the extra context is unavailable.
-   */
-  private static class StartBundleContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.StartBundleContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private StartBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) {
-      fn.super();
-      this.context = context;
-    }
-
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no defined window.");
-    }
-
-    @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a FinishBundleContext in finishBundle");
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public RestrictionTracker<?> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.FinishBundle} method, which means the extra context is unavailable.
-   */
-  private static class FinishBundleContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.FinishBundleContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private FinishBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) {
-      fn.super();
-      this.context = context;
-    }
-
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no defined window.");
-    }
-
-    @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a StartBundleContext in startBundle");
-    }
-
-    @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public RestrictionTracker<?> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-
-    @Override
-    public void output(
-        OutputT output, Instant timestamp, BoundedWindow window) {
-      // Not full fidelity conversion. This should be removed as soon as possible.
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void output(
-        TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
-      // Not full fidelity conversion. This should be removed as soon as possible.
-      context.outputWithTimestamp(tag, output, timestamp);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
-   */
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      context.output(tag, output);
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.outputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public void updateWatermark(Instant watermark) {
-      throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      return null;
-    }
-
-    @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
-      return null;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public RestrictionTracker<?> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index ee3aefa..f3cca6f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -74,34 +74,9 @@ public class DoFnRunners {
   }
 
   /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
-   *
-   * <p>It invokes {@link OldDoFn#processElement} for each input.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> additionalOutputTags,
-      StepContext stepContext,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return new SimpleOldDoFnRunner<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        additionalOutputTags,
-        stepContext,
-        windowingStrategy);
-  }
-
-  /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
-   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
deleted file mode 100644
index a160553..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.construction.Triggers;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-
-/**
- * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
- * implementation is applicable.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
-
-  private final WindowingStrategy<?, W> strategy;
-  private final StateInternalsFactory<K> stateInternalsFactory;
-  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
-  public GroupAlsoByWindowViaOutputBufferDoFn(
-      WindowingStrategy<?, W> windowingStrategy,
-      StateInternalsFactory<K> stateInternalsFactory,
-      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    this.strategy = windowingStrategy;
-    this.reduceFn = reduceFn;
-    this.stateInternalsFactory = stateInternalsFactory;
-  }
-
-  @Override
-  public void processElement(ProcessContext c) throws Exception {
-    K key = c.element().getKey();
-    // Used with Batch, we know that all the data is available for this key. We can't use the
-    // timer manager from the context because it doesn't exist. So we create one and emulate the
-    // watermark, knowing that we have all data and it is in timestamp order.
-    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-    timerInternals.advanceProcessingTime(Instant.now());
-    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-
-    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
-        new ReduceFnRunner<>(
-            key,
-            strategy,
-            ExecutableTriggerStateMachine.create(
-                TriggerStateMachines.stateMachineForTrigger(
-                    Triggers.toProto(strategy.getTrigger()))),
-            stateInternals,
-            timerInternals,
-            WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
-            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
-            reduceFn,
-            c.getPipelineOptions());
-
-    // Process the elements.
-    reduceFnRunner.processElements(c.element().getValue());
-
-    // Finish any pending windows by advancing the input watermark to infinity.
-    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    // Finally, advance the processing time to infinity to fire any timers.
-    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    fireEligibleTimers(timerInternals, reduceFnRunner);
-
-    reduceFnRunner.persist();
-  }
-
-  private void fireEligibleTimers(
-      InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner)
-      throws Exception {
-    List<TimerInternals.TimerData> timers = new ArrayList<>();
-    while (true) {
-      TimerInternals.TimerData timer;
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
-        timers.add(timer);
-      }
-      if (timers.isEmpty()) {
-        break;
-      }
-      reduceFnRunner.onTimers(timers);
-      timers.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
deleted file mode 100644
index 2342c52..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.runners.core.construction.Triggers;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
- * {@link ReduceFnRunner}.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaWindowSetDoFn<
-        K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
-    extends OldDoFn<RinT, KV<K, OutputT>> {
-
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-      OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
-          WindowingStrategy<?, W> strategy,
-          StateInternalsFactory<K> stateInternalsFactory,
-          SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
-  }
-
-  private final WindowingStrategy<Object, W> windowingStrategy;
-  private final StateInternalsFactory<K> stateInternalsFactory;
-  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
-  private GroupAlsoByWindowViaWindowSetDoFn(
-      WindowingStrategy<?, W> windowingStrategy,
-      StateInternalsFactory<K> stateInternalsFactory,
-      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
-    this.windowingStrategy = noWildcard;
-    this.reduceFn = reduceFn;
-    this.stateInternalsFactory = stateInternalsFactory;
-  }
-
-  @Override
-  public void processElement(ProcessContext c) throws Exception {
-    KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
-
-    K key = keyedWorkItem.key();
-    TimerInternals timerInternals = c.windowingInternals().timerInternals();
-    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-
-    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
-        new ReduceFnRunner<>(
-            key,
-            windowingStrategy,
-            ExecutableTriggerStateMachine.create(
-                TriggerStateMachines.stateMachineForTrigger(
-                    Triggers.toProto(windowingStrategy.getTrigger()))),
-            stateInternals,
-            timerInternals,
-            WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
-            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
-            reduceFn,
-            c.getPipelineOptions());
-
-    reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
-    reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
-    reduceFnRunner.persist();
-  }
-
-  public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
-    // Safe contravariant cast
-    @SuppressWarnings("unchecked")
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
-        (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
-    return asFn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 5b82d1f..744d162 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 
 /**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the
  * {@link ReduceFnRunner}.
  */
 @SystemDoFnInternal
@@ -134,8 +134,4 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
     reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
     reduceFnRunner.persist();
   }
-
-  public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
-    throw new RuntimeException("Not implement!");
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
new file mode 100644
index 0000000..8d96257
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+/**
+ * Common aggregator names for {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow}.
+ */
+public abstract class GroupAlsoByWindowsAggregators {
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
deleted file mode 100644
index 2bd9ee0..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
- * combining values.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-@SystemDoFnInternal
-public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
-  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
-  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 570f524..1cf1509 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -33,7 +34,7 @@ import org.joda.time.Instant;
 
 /**
  * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ * a {@link KeyedWorkItem} input {@link DoFn}.
  *
  * <p>It expands windows before checking data lateness.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
deleted file mode 100644
index 41bb598..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code OldDoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code OldDoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code OldDoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFn} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
- */
-@Deprecated
-public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-  /**
-   * Information accessible to all methods in this {@code OldDoFn}.
-   * Used primarily to output elements.
-   */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link PipelineRunner}
-     * invoking this {@code OldDoFn}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should be considered
-     * immutable and not be modified in any way. It may be cached or retained
-     * by a Beam runner or later steps in the pipeline, or used in
-     * other unspecified ways.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to {@link OldDoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code output} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags}
-     * to specify the tags of outputs that it consumes. Outputs that are not consumed, e.g., outputs
-     * for monitoring purposes only, don't necessarily need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link OldDoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo.SingleOutput#withOutputTags
-     */
-    public abstract <T> void output(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified output {@code PCollection}, with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp must not be
-     * older than the input element's timestamp minus {@link OldDoFn#getAllowedTimestampSkew
-     * getAllowedTimestampSkew}. The output element will be in the same windows as the input
-     * element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the
-     * input {@code PCollection} to determine what windows the element should be in, throwing an
-     * exception if the {@code WindowFn} attempts to access any information about the input element
-     * except for the timestamp.
-     *
-     * @see ParDo.SingleOutput#withOutputTags
-     */
-    public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
-  }
-
-  /**
-   * Information accessible when running {@link OldDoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element should be considered immutable. A Beam runner will not mutate the
-     * element, so it is safe to cache, etc. The element should not be mutated by any of the
-     * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
-     * runtime, or used in other unspecified ways.
-     */
-    public abstract InputT element();
-
-    /**
-     * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
-     *
-     * <p>See
-     * {@link WindowMappingFn#getSideInputWindow}
-     * for how this corresponding window is determined.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo.SingleOutput#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns the window into which the input element has been assigned.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     *
-     * @throws UnsupportedOperationException if this {@link OldDoFn} does
-     * not implement {@link RequiresWindowAccess}.
-     */
-    public abstract BoundedWindow window();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-
-    /**
-     * Returns the process context to use for implementing windowing.
-     */
-    @Experimental
-    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link OldDoFn.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   *
-   * <p>Note that producing an element whose timestamp is less than the
-   * current timestamp may result in late data, i.e. returning a non-zero
-   * value here does not impact watermark calculations used for firing
-   * windows.
-   *
-   * @deprecated does not interact well with the watermark.
-   */
-  @Deprecated
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /**
-   * Interface for signaling that a {@link OldDoFn} needs to access the window the
-   * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
-   */
-  @Experimental
-  public interface RequiresWindowAccess {}
-
-  public OldDoFn() {
-  }
-
-  /**
-   * Prepares this {@link DoFn} instance for processing bundles.
-   *
-   * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
-   * {@link DoFn} method is called.
-   *
-   * <p>By default, does nothing.
-   */
-  public void setup() throws Exception {
-  }
-
-  /**
-   * Prepares this {@code OldDoFn} instance for processing a batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void startBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Processes one input element.
-   *
-   * <p>The current element of the input {@code PCollection} is returned by
-   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
-   * runner will not mutate the element, so it is safe to cache, etc. The element should not be
-   * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
-   * the Beam runner, or used in other unspecified ways.
-   *
-   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
-   * Once passed to {@code output} the element should be considered immutable and not be modified in
-   * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
-   * unspecified ways.
-   *
-   * @see ProcessContext
-   */
-  public abstract void processElement(ProcessContext c) throws Exception;
-
-  /**
-   * Finishes processing this batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void finishBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Cleans up this {@link DoFn}.
-   *
-   * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
-   * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
-   * {@link DoFn} methods will be called after a call to {@link #teardown()}.
-   *
-   * <p>By default, does nothing.
-   */
-  public void teardown() throws Exception {
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
deleted file mode 100644
index 2a0b688..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-/**
- * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the {@link OldDoFn} (main) input elements
- * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
- */
-class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  /** The {@link OldDoFn} being run. */
-  private final OldDoFn<InputT, OutputT> fn;
-  /** The context used for running the {@link OldDoFn}. */
-  private final DoFnContext<InputT, OutputT> context;
-
-  public SimpleOldDoFnRunner(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> additionalOutputTags,
-      StepContext stepContext,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = fn;
-    this.context = new DoFnContext<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        additionalOutputTags,
-        stepContext,
-        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
-  }
-
-  @Override
-  public void startBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.startBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    if (elem.getWindows().size() <= 1
-        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
-            && context.sideInputReader.isEmpty())) {
-      invokeProcessElement(elem);
-    } else {
-      // We could modify the windowed value (and the processContext) to
-      // avoid repeated allocations, but this is more straightforward.
-      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
-        invokeProcessElement(windowedValue);
-      }
-    }
-  }
-
-  @Override
-  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
-      TimeDomain timeDomain) {
-    throw new UnsupportedOperationException(
-        String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName()));
-  }
-
-  private void invokeProcessElement(WindowedValue<InputT> elem) {
-    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.processElement(processContext);
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
-    }
-  }
-
-  @Override
-  public void finishBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.finishBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  /**
-   * Returns a new {@link OldDoFn.ProcessContext} for the given element.
-   */
-  private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
-      WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
-  }
-
-  private RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
-  }
-
-  private boolean isSystemDoFn() {
-    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-  }
-
-  /**
-   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final PipelineOptions options;
-    final OldDoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final StepContext stepContext;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we can throw an
-     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(PipelineOptions options,
-                       OldDoFn<InputT, OutputT> fn,
-                       SideInputReader sideInputReader,
-                       OutputManager outputManager,
-                       TupleTag<OutputT> mainOutputTag,
-                       List<TupleTag<?>> additionalOutputTags,
-                       StepContext stepContext,
-                       WindowFn<?, ?> windowFn) {
-      fn.super();
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
-        outputTags.add(additionalOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.windowFn = windowFn;
-    }
-
-    //////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
-            @Override
-            public Object element() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input element when none was available");
-            }
-
-            @Override
-            public Instant timestamp() {
-              if (inputTimestamp == null) {
-                throw new UnsupportedOperationException(
-                    "WindowFn attempted to access input timestamp when none was available");
-              }
-              return inputTimestamp;
-            }
-
-            @Override
-            public W window() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input windows when none were available");
-            }
-          });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
-    }
-
-    private <T> void outputWindowedValue(TupleTag<T> tag,
-                                               T output,
-                                               Instant timestamp,
-                                               Collection<? extends BoundedWindow> windows,
-                                               PaneInfo pane) {
-      outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(tag, windowedElem);
-      }
-    }
-
-    // Following implementations of output, outputWithTimestamp, and output
-    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in OldDoFn.processElement.
-    @Override
-    public void output(OutputT output) {
-      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "TupleTag passed to output cannot be null");
-      outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
-      outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-  }
-
-  /**
-   * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
-   * over a single element.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  private static class DoFnProcessContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
-    final OldDoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-
-    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
-                              DoFnContext<InputT, OutputT> context,
-                              WindowedValue<InputT> windowedValue) {
-      fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public InputT element() {
-      return windowedValue.getValue();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple windows");
-        }
-      }
-      return context.sideInput(
-          view, view.getWindowMappingFn().getSideInputWindow(window));
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a OldDoFn marked as"
-                + "RequiresWindowAccess.");
-      }
-      return Iterables.getOnlyElement(windows());
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return windowedValue.getPane();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
-      context.outputWindowedValue(output, timestamp,
-          windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      context.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "Tag passed to output cannot be null");
-      context.outputWindowedValue(tag, windowedValue.withValue(output));
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
-      checkTimestamp(timestamp);
-      context.outputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public Instant timestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
-    }
-
-    private void checkTimestamp(Instant timestamp) {
-      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
-        throw new IllegalArgumentException(String.format(
-            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
-            timestamp, windowedValue.getTimestamp(),
-            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
-      }
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public void outputWindowedValue(OutputT output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          context.outputWindowedValue(output, timestamp, windows, pane);
-        }
-
-        @Override
-        public <AdditionalOutputT> void outputWindowedValue(
-            TupleTag<AdditionalOutputT> tag,
-            AdditionalOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          context.outputWindowedValue(tag, output, timestamp, windows, pane);
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public StateInternals stateInternals() {
-          return context.stepContext.stateInternals();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          return context.sideInput(view, sideInputWindow);
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
deleted file mode 100644
index 4a58445..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link
- * OutputWindowedValue}.
- */
-public class WindowingInternalsAdapters {
-  static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) {
-    return new SideInputReader() {
-      @Override
-      public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-        return windowingInternals.sideInput(view, sideInputWindow);
-      }
-
-      @Override
-      public <T> boolean contains(PCollectionView<T> view) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public boolean isEmpty() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue(
-      final WindowingInternals<?, OutputT> windowingInternals) {
-    return new OutputWindowedValue<OutputT>() {
-      @Override
-      public void outputWindowedValue(
-          OutputT output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
-      }
-
-      @Override
-      public <AdditionalOutputT> void outputWindowedValue(
-          TupleTag<AdditionalOutputT> tag,
-          AdditionalOutputT output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        windowingInternals.outputWindowedValue(tag, output, timestamp, windows, pane);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
deleted file mode 100644
index a265ead..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link GroupAlsoByWindowViaOutputBufferDoFn}.
- */
-@RunWith(JUnit4.class)
-public class GroupAlsoByWindowViaOutputBufferDoFnTest {
-
-  private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
-  implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
-
-    private final Coder<InputT> inputCoder;
-
-    public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
-      this.inputCoder = inputCoder;
-    }
-
-    @Override
-    public <W extends BoundedWindow>
-    GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
-            WindowingStrategy<?, W> windowingStrategy,
-            StateInternalsFactory<K> stateInternalsFactory) {
-      return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
-          windowingStrategy,
-          stateInternalsFactory,
-          SystemReduceFn.<K, InputT, W>buffering(inputCoder));
-    }
-  }
-
-  @Test
-  public void testEmptyInputEmptyOutput() throws Exception {
-    GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
-        new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSlidingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoSessions() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-}