You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/20 22:38:34 UTC

[1/4] beam git commit: Moves OldDoFn to runners-core

Repository: beam
Updated Branches:
  refs/heads/master 77c7505b8 -> a6810372b


http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
deleted file mode 100644
index 7b04533..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code OldDoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code OldDoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code OldDoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFn} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
- */
-@Deprecated
-public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-  /**
-   * Information accessible to all methods in this {@code OldDoFn}.
-   * Used primarily to output elements.
-   */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link org.apache.beam.sdk.runners.PipelineRunner}
-     * invoking this {@code OldDoFn}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should be considered
-     * immutable and not be modified in any way. It may be cached or retained
-     * by a Beam runner or later steps in the pipeline, or used in
-     * other unspecified ways.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to {@link OldDoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the side output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code sideOutput} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
-     * specify the tags of side outputs that it consumes. Non-consumed side
-     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
-     * need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link OldDoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified side output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutputWithTimestamp(
-        TupleTag<T> tag, T output, Instant timestamp);
-
-    /**
-     * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
-     * specified name and aggregation logic specified by {@link CombineFn}.
-     *
-     * <p>For internal use only.
-     *
-     * @param name the name of the aggregator
-     * @param combiner the {@link CombineFn} to use in the aggregator
-     * @return an aggregator for the provided name and {@link CombineFn} in this
-     *         context
-     */
-    @Experimental(Kind.AGGREGATOR)
-    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
-    /**
-     * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
-     * usable within this context.
-     *
-     * <p>This method should be called by runners before {@link OldDoFn#startBundle}
-     * is executed.
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected final void setupDelegateAggregators() {
-      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
-        setupDelegateAggregator(aggregator);
-      }
-
-      aggregatorsAreFinal = true;
-    }
-
-    private <AggInputT, AggOutputT> void setupDelegateAggregator(
-        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
-      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
-          aggregator.getName(), aggregator.getCombineFn());
-
-      aggregator.setDelegate(delegate);
-    }
-  }
-
-  /**
-   * Information accessible when running {@link OldDoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element should be considered immutable. A Beam runner will not mutate the
-     * element, so it is safe to cache, etc. The element should not be mutated by any of the
-     * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
-     * runtime, or used in other unspecified ways.
-     */
-    public abstract InputT element();
-
-    /**
-     * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
-     *
-     * <p>See
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
-     * for how this corresponding window is determined.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns the window into which the input element has been assigned.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     *
-     * @throws UnsupportedOperationException if this {@link OldDoFn} does
-     * not implement {@link RequiresWindowAccess}.
-     */
-    public abstract BoundedWindow window();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-
-    /**
-     * Returns the process context to use for implementing windowing.
-     */
-    @Experimental
-    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link OldDoFn.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   *
-   * <p>Note that producing an element whose timestamp is less than the
-   * current timestamp may result in late data, i.e. returning a non-zero
-   * value here does not impact watermark calculations used for firing
-   * windows.
-   *
-   * @deprecated does not interact well with the watermark.
-   */
-  @Deprecated
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /**
-   * Interface for signaling that a {@link OldDoFn} needs to access the window the
-   * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
-   */
-  @Experimental
-  public interface RequiresWindowAccess {}
-
-  public OldDoFn() {
-    this(new HashMap<String, DelegatingAggregator<?, ?>>());
-  }
-
-  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
-    this.aggregators = aggregators;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
-  /**
-   * Prepares this {@link DoFn} instance for processing bundles.
-   *
-   * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
-   * {@link DoFn} method is called.
-   *
-   * <p>By default, does nothing.
-   */
-  public void setup() throws Exception {
-  }
-
-  /**
-   * Prepares this {@code OldDoFn} instance for processing a batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void startBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Processes one input element.
-   *
-   * <p>The current element of the input {@code PCollection} is returned by
-   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
-   * runner will not mutate the element, so it is safe to cache, etc. The element should not be
-   * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
-   * the Beam runner, or used in other unspecified ways.
-   *
-   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
-   * Once passed to {@code output} the element should be considered immutable and not be modified in
-   * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
-   * unspecified ways.
-   *
-   * @see ProcessContext
-   */
-  public abstract void processElement(ProcessContext c) throws Exception;
-
-  /**
-   * Finishes processing this batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void finishBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Cleans up this {@link DoFn}.
-   *
-   * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
-   * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
-   * {@link DoFn} methods will be called after a call to {@link #teardown()}.
-   *
-   * <p>By default, does nothing.
-   */
-  public void teardown() throws Exception {
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-
-    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
-      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
-   */
-  Collection<Aggregator<?, ?>> getAggregators() {
-    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index 1c59af7..72179a3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -40,7 +40,7 @@ public class NameUtils {
   }
 
   private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};
+      new String[]{"DoFn", "CombineFn", "Fn"};
 
   /**
    * Pattern to match a non-anonymous inner class.

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index f51a6b0..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
-  @Mock
-  private Aggregator<Long, Long> delegate;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testAddValueWithoutDelegateThrowsException() {
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("cannot be called");
-    thrown.expectMessage("DoFn");
-
-    aggregator.addValue(21.2);
-  }
-
-  @Test
-  public void testSetDelegateThenAddValueCallsDelegate() {
-    String name = "agg";
-    CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Long, Long> aggregator =
-        (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
-    aggregator.setDelegate(delegate);
-
-    aggregator.addValue(12L);
-
-    verify(delegate).addValue(12L);
-  }
-
-  @Test
-  public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    @SuppressWarnings("unchecked")
-    Aggregator<Double, Double> secondDelegate =
-        mock(Aggregator.class, "secondDelegate");
-
-    aggregator.setDelegate(aggregator);
-    aggregator.setDelegate(secondDelegate);
-
-    aggregator.addValue(2.25);
-
-    verify(secondDelegate).addValue(2.25);
-    verify(delegate, never()).addValue(anyLong());
-  }
-
-  @Test
-  public void testGetNameReturnsName() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-  }
-
-  @Test
-  public void testGetCombineFnReturnsCombineFn() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> CombineFn<T, ?, T> mockCombineFn(
-      @SuppressWarnings("unused") Class<T> clazz) {
-    return mock(CombineFn.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
deleted file mode 100644
index 0db130d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * A {@link OldDoFn} that does nothing with provided elements. Used for testing
- * methods provided by the {@link OldDoFn} abstract class.
- *
- * @param <InputT> unused.
- * @param <OutputT> unused.
- */
-class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-  @Override
-  public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
-  }
-
-  /**
-   * Returns a new NoOp Context.
-   */
-  public OldDoFn<InputT, OutputT>.Context context() {
-    return new NoOpDoFnContext();
-  }
-
-  /**
-   * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
-   */
-  private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-    @Override
-    public void output(OutputT output) {
-    }
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-    }
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-    }
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
-        Instant timestamp) {
-    }
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
deleted file mode 100644
index b5cb286..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
-  @Mock
-  private Aggregator<Long, Long> agg;
-
-  private OldDoFn<Object, Object> fn;
-  private OldDoFn<Object, Object>.Context context;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-
-    // Need to be real objects to call the constructor, and to reference the
-    // outer instance of OldDoFn
-    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
-    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
-    fn = spy(noOpFn);
-    context = spy(noOpContext);
-  }
-
-  @Test
-  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-    Aggregator<Long, Long> delegateAggregator =
-        fn.createAggregator("test", combiner);
-
-    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-    context.setupDelegateAggregators();
-    delegateAggregator.addValue(1L);
-
-    verify(agg).addValue(1L);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
deleted file mode 100644
index 1c767b1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for OldDoFn.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnTest implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testCreateAggregatorWithCombinerSucceeds() {
-    String name = "testAggregator";
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullNameThrowsException() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("name cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator(null, Sum.ofLongs());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullCombineFnThrowsException() {
-    CombineFn<Object, Object, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
-    SerializableFunction<Iterable<Object>, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithSameNameThrowsException() {
-    String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Cannot create");
-    thrown.expectMessage(name);
-    thrown.expectMessage("already exists");
-
-    doFn.createAggregator(name, combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
-    String nameOne = "testAggregator";
-    String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Double, Double> aggregatorOne =
-        doFn.createAggregator(nameOne, combiner);
-    Aggregator<Double, Double> aggregatorTwo =
-        doFn.createAggregator(nameTwo, combiner);
-
-    assertNotEquals(aggregatorOne, aggregatorTwo);
-  }
-
-  @Test
-  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
-    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception { }
-    };
-    OldDoFn<String, String>.Context context = createContext(fn);
-    context.setupDelegateAggregators();
-
-    thrown.expect(isA(IllegalStateException.class));
-    fn.createAggregator("anyAggregate", Max.ofIntegers());
-  }
-
-  private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void output(String output) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void outputWithTimestamp(String output, Instant timestamp) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <AggInputT, AggOutputT>
-      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  @Test
-  public void testPopulateDisplayDataDefaultBehavior() {
-    OldDoFn<String, String> usesDefault =
-        new OldDoFn<String, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {}
-        };
-
-    DisplayData data = DisplayData.from(usesDefault);
-    assertThat(data.items(), empty());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index b81aa36..6848ea4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -42,22 +42,18 @@ public class NameUtilsTest {
 
   @Test
   public void testDropsStandardSuffixes() {
-    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", true));
     assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", true));
     assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", true));
 
-    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", false));
     assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", false));
     assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", false));
   }
 
   @Test
   public void testDropsStandardSuffixesInAllComponents() {
-    assertEquals("Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", true));
     assertEquals("Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", true));
     assertEquals("Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", true));
 
-    assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", false));
     assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", false));
     assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", false));
   }
@@ -79,12 +75,12 @@ public class NameUtilsTest {
   /**
    * Inner class for simple name test.
    */
-  private class EmbeddedOldDoFn {
+  private class EmbeddedDoFn {
 
-    private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {}
+    private class DeeperEmbeddedDoFn extends EmbeddedDoFn {}
 
-    private EmbeddedOldDoFn getEmbedded() {
-      return new DeeperEmbeddedOldDoFn();
+    private EmbeddedDoFn getEmbedded() {
+      return new DeeperEmbeddedDoFn();
     }
   }
 
@@ -112,18 +108,18 @@ public class NameUtilsTest {
 
   @Test
   public void testSimpleName() {
-    assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn()));
+    assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedDoFn()));
   }
 
   @Test
   public void testAnonSimpleName() throws Exception {
-    assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {}));
+    assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedDoFn() {}));
   }
 
   @Test
   public void testNestedSimpleName() {
-    EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
-    EmbeddedOldDoFn inner = fn.getEmbedded();
+    EmbeddedDoFn fn = new EmbeddedDoFn();
+    EmbeddedDoFn inner = fn.getEmbedded();
 
     assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner));
   }


[2/4] beam git commit: Moves OldDoFn to runners-core

Posted by ke...@apache.org.
Moves OldDoFn to runners-core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f8b8c5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f8b8c5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f8b8c5b

Branch: refs/heads/master
Commit: 5f8b8c5b06cfd49c4293a20dff2eea99f1076444
Parents: 77c7505
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jan 17 16:12:39 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 20 13:31:58 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../beam/runners/core/AssignWindowsDoFn.java    |   3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |   1 -
 .../apache/beam/runners/core/DoFnRunner.java    |   1 -
 .../apache/beam/runners/core/DoFnRunners.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   1 -
 .../core/LateDataDroppingDoFnRunner.java        |   1 -
 .../org/apache/beam/runners/core/OldDoFn.java   | 472 +++++++++++++++++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   3 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 ++++++
 .../apache/beam/runners/core/NoOpOldDoFn.java   |  72 +++
 .../beam/runners/core/OldDoFnContextTest.java   |  72 +++
 .../apache/beam/runners/core/OldDoFnTest.java   | 192 ++++++++
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   2 +-
 .../runners/flink/OldPerKeyCombineFnRunner.java |   2 +-
 .../flink/OldPerKeyCombineFnRunners.java        |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../FlinkMultiOutputProcessContext.java         |   2 +-
 .../functions/FlinkNoElementAssignContext.java  |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../FlinkSingleOutputProcessContext.java        |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |   4 +-
 .../sdk/transforms/DelegatingAggregator.java    |   2 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------
 .../org/apache/beam/sdk/util/NameUtils.java     |   2 +-
 .../DoFnDelegatingAggregatorTest.java           | 142 ------
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |  71 ---
 .../beam/sdk/transforms/OldDoFnContextTest.java |  69 ---
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 187 --------
 .../org/apache/beam/sdk/util/NameUtilsTest.java |  20 +-
 39 files changed, 982 insertions(+), 978 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index ef049e1..50af81d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 173434f..4c2b461 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -44,6 +44,7 @@ import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOption
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -52,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index de4c15d..808001e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
index 0eb1667..bbf3574 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -21,8 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 0f5624f..23aba58 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 7c73a34..66f95db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 2f3e93c..f3972ae 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index d79683a..ecce4fc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -21,7 +21,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 9a2f8fd..7e96136 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 290171a..9436ccf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
new file mode 100644
index 0000000..b099721
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code OldDoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code OldDoFn}s can be tested in the context of a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output.  Unit testing of a {@code OldDoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>{@link DoFn} (currently experimental) offers an alternative
+ * mechanism for accessing {@link ProcessContext#window()} without the need
+ * to implement {@link RequiresWindowAccess}.
+ *
+ * <p>See also {@link #processElement} for details on implementing the transformation
+ * from {@code InputT} to {@code OutputT}.
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
+ */
+@Deprecated
+public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+  /**
+   * Information accessible to all methods in this {@code OldDoFn}.
+   * Used primarily to output elements.
+   */
+  public abstract class Context {
+
+    /**
+     * Returns the {@code PipelineOptions} specified with the
+     * {@link org.apache.beam.sdk.runners.PipelineRunner}
+     * invoking this {@code OldDoFn}.  The {@code PipelineOptions} will
+     * be the default running via {@link DoFnTester}.
+     */
+    public abstract PipelineOptions getPipelineOptions();
+
+    /**
+     * Adds the given element to the main output {@code PCollection}.
+     *
+     * <p>Once passed to {@code output} the element should be considered
+     * immutable and not be modified in any way. It may be cached or retained
+     * by a Beam runner or later steps in the pipeline, or used in
+     * other unspecified ways.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
+     * element will have the same timestamp and be in the same windows
+     * as the input element passed to {@link OldDoFn#processElement processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     */
+    public abstract void output(OutputT output);
+
+    /**
+     * Adds the given element to the main output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code outputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     */
+    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+
+    /**
+     * Adds the given element to the side output {@code PCollection} with the
+     * given tag.
+     *
+     * <p>Once passed to {@code sideOutput} the element should not be modified
+     * in any way.
+     *
+     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
+     * specify the tags of side outputs that it consumes. Non-consumed side
+     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+     * need to be specified.
+     *
+     * <p>The output element will have the same timestamp and be in the same
+     * windows as the input element passed to {@link OldDoFn#processElement processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+    /**
+     * Adds the given element to the specified side output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutputWithTimestamp(
+        TupleTag<T> tag, T output, Instant timestamp);
+
+    /**
+     * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
+     * specified name and aggregation logic specified by {@link CombineFn}.
+     *
+     * <p>For internal use only.
+     *
+     * @param name the name of the aggregator
+     * @param combiner the {@link CombineFn} to use in the aggregator
+     * @return an aggregator for the provided name and {@link CombineFn} in this
+     *         context
+     */
+    @Experimental(Kind.AGGREGATOR)
+    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+    /**
+     * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
+     * usable within this context.
+     *
+     * <p>This method should be called by runners before {@link OldDoFn#startBundle}
+     * is executed.
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected final void setupDelegateAggregators() {
+      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+        setupDelegateAggregator(aggregator);
+      }
+
+      aggregatorsAreFinal = true;
+    }
+
+    private <AggInputT, AggOutputT> void setupDelegateAggregator(
+        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
+          aggregator.getName(), aggregator.getCombineFn());
+
+      aggregator.setDelegate(delegate);
+    }
+  }
+
+  /**
+   * Information accessible when running {@link OldDoFn#processElement}.
+   */
+  public abstract class ProcessContext extends Context {
+
+    /**
+     * Returns the input element to be processed.
+     *
+     * <p>The element should be considered immutable. A Beam runner will not mutate the
+     * element, so it is safe to cache, etc. The element should not be mutated by any of the
+     * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
+     * runtime, or used in other unspecified ways.
+     */
+    public abstract InputT element();
+
+    /**
+     * Returns the value of the side input for the window corresponding to the
+     * window of the main input element.
+     *
+     * <p>See
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+     * for how this corresponding window is determined.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     * @see ParDo#withSideInputs
+     */
+    public abstract <T> T sideInput(PCollectionView<T> view);
+
+    /**
+     * Returns the timestamp of the input element.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract Instant timestamp();
+
+    /**
+     * Returns the window into which the input element has been assigned.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     *
+     * @throws UnsupportedOperationException if this {@link OldDoFn} does
+     * not implement {@link RequiresWindowAccess}.
+     */
+    public abstract BoundedWindow window();
+
+    /**
+     * Returns information about the pane within this window into which the
+     * input element has been assigned.
+     *
+     * <p>Generally all data is in a single, uninteresting pane unless custom
+     * triggering and/or late data has been explicitly requested.
+     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract PaneInfo pane();
+
+    /**
+     * Returns the process context to use for implementing windowing.
+     */
+    @Experimental
+    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
+  }
+
+  /**
+   * Returns the allowed timestamp skew duration, which is the maximum
+   * duration that timestamps can be shifted backward in
+   * {@link OldDoFn.Context#outputWithTimestamp}.
+   *
+   * <p>The default value is {@code Duration.ZERO}, in which case
+   * timestamps can only be shifted forward to future.  For infinite
+   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+   *
+   * <p>Note that producing an element whose timestamp is less than the
+   * current timestamp may result in late data, i.e. returning a non-zero
+   * value here does not impact watermark calculations used for firing
+   * windows.
+   *
+   * @deprecated does not interact well with the watermark.
+   */
+  @Deprecated
+  public Duration getAllowedTimestampSkew() {
+    return Duration.ZERO;
+  }
+
+  /**
+   * Interface for signaling that a {@link OldDoFn} needs to access the window the
+   * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
+   */
+  @Experimental
+  public interface RequiresWindowAccess {}
+
+  public OldDoFn() {
+    this(new HashMap<String, DelegatingAggregator<?, ?>>());
+  }
+
+  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+    this.aggregators = aggregators;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
+
+  /**
+   * Protects aggregators from being created after initialization.
+   */
+  private boolean aggregatorsAreFinal;
+
+  /**
+   * Prepares this {@link DoFn} instance for processing bundles.
+   *
+   * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
+   * {@link DoFn} method is called.
+   *
+   * <p>By default, does nothing.
+   */
+  public void setup() throws Exception {
+  }
+
+  /**
+   * Prepares this {@code OldDoFn} instance for processing a batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void startBundle(Context c) throws Exception {
+  }
+
+  /**
+   * Processes one input element.
+   *
+   * <p>The current element of the input {@code PCollection} is returned by
+   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
+   * runner will not mutate the element, so it is safe to cache, etc. The element should not be
+   * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
+   * the Beam runner, or used in other unspecified ways.
+   *
+   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
+   * Once passed to {@code output} the element should be considered immutable and not be modified in
+   * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
+   * unspecified ways.
+   *
+   * @see ProcessContext
+   */
+  public abstract void processElement(ProcessContext c) throws Exception;
+
+  /**
+   * Finishes processing this batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void finishBundle(Context c) throws Exception {
+  }
+
+  /**
+   * Cleans up this {@link DoFn}.
+   *
+   * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
+   * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
+   * {@link DoFn} methods will be called after a call to {@link #teardown()}.
+   *
+   * <p>By default, does nothing.
+   */
+  public void teardown() throws Exception {
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may override this method
+   * to provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns an {@link Aggregator} with aggregation logic specified by the
+   * {@link CombineFn} argument. The name provided must be unique across
+   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+   * during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link CombineFn} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+    checkNotNull(name, "name cannot be null");
+    checkNotNull(combiner, "combiner cannot be null");
+    checkArgument(!aggregators.containsKey(name),
+        "Cannot create aggregator with name %s."
+        + " An Aggregator with that name already exists within this scope.",
+        name);
+
+    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
+        + " Aggregators should be registered during pipeline construction.");
+
+    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+        new DelegatingAggregator<>(name, combiner);
+    aggregators.put(name, aggregator);
+    return aggregator;
+  }
+
+  /**
+   * Returns an {@link Aggregator} with the aggregation logic specified by the
+   * {@link SerializableFunction} argument. The name provided must be unique
+   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+   * created during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link SerializableFunction} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
+      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+    checkNotNull(combiner, "combiner cannot be null.");
+    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+  }
+
+  /**
+   * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
+   */
+  Collection<Aggregator<?, ?>> getAggregators() {
+    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 9808e56..2fe9226 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -27,11 +27,10 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
new file mode 100644
index 0000000..b44e8a4
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DelegatingAggregator}.
+ */
+@RunWith(JUnit4.class)
+public class DoFnDelegatingAggregatorTest {
+
+  @Mock
+  private Aggregator<Long, Long> delegate;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testAddValueWithoutDelegateThrowsException() {
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("cannot be called");
+    thrown.expectMessage("DoFn");
+
+    aggregator.addValue(21.2);
+  }
+
+  @Test
+  public void testSetDelegateThenAddValueCallsDelegate() {
+    String name = "agg";
+    CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Long, Long> aggregator =
+        (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
+
+    aggregator.setDelegate(delegate);
+
+    aggregator.addValue(12L);
+
+    verify(delegate).addValue(12L);
+  }
+
+  @Test
+  public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+    @SuppressWarnings("unchecked")
+    Aggregator<Double, Double> secondDelegate =
+        mock(Aggregator.class, "secondDelegate");
+
+    aggregator.setDelegate(aggregator);
+    aggregator.setDelegate(secondDelegate);
+
+    aggregator.addValue(2.25);
+
+    verify(secondDelegate).addValue(2.25);
+    verify(delegate, never()).addValue(anyLong());
+  }
+
+  @Test
+  public void testGetNameReturnsName() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+    assertEquals(name, aggregator.getName());
+  }
+
+  @Test
+  public void testGetCombineFnReturnsCombineFn() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> CombineFn<T, ?, T> mockCombineFn(
+      @SuppressWarnings("unused") Class<T> clazz) {
+    return mock(CombineFn.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
new file mode 100644
index 0000000..5cbea8c
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A {@link OldDoFn} that does nothing with provided elements. Used for testing
+ * methods provided by the {@link OldDoFn} abstract class.
+ *
+ * @param <InputT> unused.
+ * @param <OutputT> unused.
+ */
+class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+  @Override
+  public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+  }
+
+  /**
+   * Returns a new NoOp Context.
+   */
+  public OldDoFn<InputT, OutputT>.Context context() {
+    return new NoOpDoFnContext();
+  }
+
+  /**
+   * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
+   */
+  private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+    @Override
+    public void output(OutputT output) {
+    }
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+    }
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+    }
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
+        Instant timestamp) {
+    }
+    @Override
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
new file mode 100644
index 0000000..a1cd49d
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Sum;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link OldDoFn.Context}.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnContextTest {
+
+  @Mock
+  private Aggregator<Long, Long> agg;
+
+  private OldDoFn<Object, Object> fn;
+  private OldDoFn<Object, Object>.Context context;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    // Need to be real objects to call the constructor, and to reference the
+    // outer instance of OldDoFn
+    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
+    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
+
+    fn = spy(noOpFn);
+    context = spy(noOpContext);
+  }
+
+  @Test
+  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+    Aggregator<Long, Long> delegateAggregator =
+        fn.createAggregator("test", combiner);
+
+    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+    context.setupDelegateAggregators();
+    delegateAggregator.addValue(1L);
+
+    verify(agg).addValue(1L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
new file mode 100644
index 0000000..651bc72
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for OldDoFn.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnTest implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testCreateAggregatorWithCombinerSucceeds() {
+    String name = "testAggregator";
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+    assertEquals(name, aggregator.getName());
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullNameThrowsException() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("name cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator(null, Sum.ofLongs());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullCombineFnThrowsException() {
+    CombineFn<Object, Object, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+    SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithSameNameThrowsException() {
+    String name = "testAggregator";
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator(name, combiner);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot create");
+    thrown.expectMessage(name);
+    thrown.expectMessage("already exists");
+
+    doFn.createAggregator(name, combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+    String nameOne = "testAggregator";
+    String nameTwo = "aggregatorPrime";
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    Aggregator<Double, Double> aggregatorOne =
+        doFn.createAggregator(nameOne, combiner);
+    Aggregator<Double, Double> aggregatorTwo =
+        doFn.createAggregator(nameTwo, combiner);
+
+    assertNotEquals(aggregatorOne, aggregatorTwo);
+  }
+
+  @Test
+  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception { }
+    };
+    OldDoFn<String, String>.Context context = createContext(fn);
+    context.setupDelegateAggregators();
+
+    thrown.expect(isA(IllegalStateException.class));
+    fn.createAggregator("anyAggregate", Max.ofIntegers());
+  }
+
+  private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
+    return fn.new Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void output(String output) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void outputWithTimestamp(String output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <AggInputT, AggOutputT>
+      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Test
+  public void testPopulateDisplayDataDefaultBehavior() {
+    OldDoFn<String, String> usesDefault =
+        new OldDoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {}
+        };
+
+    DisplayData data = DisplayData.from(usesDefault);
+    assertThat(data.items(), empty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 4610069..97da9ee 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
+
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
index 5d676dc..71c3aa4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.flink;
 
 import java.io.Serializable;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 
 /**
  * An interface that runs a {@link PerKeyCombineFn} with unified APIs using

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
index 8ebeadf..90894f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.runners.flink;
 
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 2a4a68e..8b2bcc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 1b43172..5ec6a77 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index a97bd46..aeeabbf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index a3d2b18..7882b5f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index c890272..ad7255b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index f5a9087..7db30d1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 53b9803..e955679 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -24,11 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index a3fa0d4..81e37f4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,12 +26,12 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
index 529b1cc..0db7f5a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 90cdf4c..ac85b3c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index cd6b5aa..d4273b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -42,12 +42,12 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 0c5be90..4d80a39 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,8 +40,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
   private final Map<Long, TupleTag<?>> outputMap;
 
   /**
-   * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a
-   * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
+   * Creates a {@link DoFnInfo} for the given {@link DoFn}.
    */
   public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
       DoFn<InputT, OutputT> doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
index e03d3b1..cfaf0a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  * @param <AggInputT> the type of input element
  * @param <AggOutputT> the type of output element
  */
-class DelegatingAggregator<AggInputT, AggOutputT>
+public class DelegatingAggregator<AggInputT, AggOutputT>
     implements Aggregator<AggInputT, AggOutputT>, Serializable {
   private static final AtomicInteger ID_GEN = new AtomicInteger();
   private final int id;


[3/4] beam git commit: Point to new Dataflow worker

Posted by ke...@apache.org.
Point to new Dataflow worker


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a2b94eca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a2b94eca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a2b94eca

Branch: refs/heads/master
Commit: a2b94ecabd80ef68654ba3bc8ffa0a0c3759316b
Parents: 5f8b8c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jan 20 13:31:30 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 20 13:32:19 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/dataflow.properties         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a2b94eca/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 161a897..9976ed9 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170119
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170120
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170119
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170120


[4/4] beam git commit: This closes #1636: Move OldDoFn to runners-core

Posted by ke...@apache.org.
This closes #1636: Move OldDoFn to runners-core

  Point to new Dataflow worker
  Moves OldDoFn to runners-core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6810372
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6810372
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6810372

Branch: refs/heads/master
Commit: a6810372b003adf24bdbe34ed764a63841af9b99
Parents: 77c7505 a2b94ec
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 20 14:22:38 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 20 14:22:38 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../beam/runners/core/AssignWindowsDoFn.java    |   3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |   1 -
 .../apache/beam/runners/core/DoFnRunner.java    |   1 -
 .../apache/beam/runners/core/DoFnRunners.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   1 -
 .../core/LateDataDroppingDoFnRunner.java        |   1 -
 .../org/apache/beam/runners/core/OldDoFn.java   | 472 +++++++++++++++++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   3 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 ++++++
 .../apache/beam/runners/core/NoOpOldDoFn.java   |  72 +++
 .../beam/runners/core/OldDoFnContextTest.java   |  72 +++
 .../apache/beam/runners/core/OldDoFnTest.java   | 192 ++++++++
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   2 +-
 .../runners/flink/OldPerKeyCombineFnRunner.java |   2 +-
 .../flink/OldPerKeyCombineFnRunners.java        |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../FlinkMultiOutputProcessContext.java         |   2 +-
 .../functions/FlinkNoElementAssignContext.java  |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../FlinkSingleOutputProcessContext.java        |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |   4 +-
 .../beam/runners/dataflow/dataflow.properties   |   4 +-
 .../sdk/transforms/DelegatingAggregator.java    |   2 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------
 .../org/apache/beam/sdk/util/NameUtils.java     |   2 +-
 .../DoFnDelegatingAggregatorTest.java           | 142 ------
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |  71 ---
 .../beam/sdk/transforms/OldDoFnContextTest.java |  69 ---
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 187 --------
 .../org/apache/beam/sdk/util/NameUtilsTest.java |  20 +-
 40 files changed, 984 insertions(+), 980 deletions(-)
----------------------------------------------------------------------