You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/17 17:53:07 UTC

[1/7] beam git commit: Removes code for wrapping DoFn as an OldDoFn

Repository: beam
Updated Branches:
  refs/heads/master eaf4450f2 -> a91571ef9


Removes code for wrapping DoFn as an OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad5eb066
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad5eb066
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad5eb066

Branch: refs/heads/master
Commit: ad5eb06619b724236ad0d2a384b8ecf4c610f1e4
Parents: f1ea8f9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:21:40 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 12 12:55:27 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/DoFnAdapters.java       | 150 ----------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 295 +------------------
 .../sdk/transforms/reflect/DoFnInvokers.java    | 141 +--------
 .../transforms/reflect/DoFnInvokersTest.java    |  36 ---
 4 files changed, 11 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index e15b08b..d1c40a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -53,18 +50,6 @@ public class DoFnAdapters {
   /** Should not be instantiated. */
   private DoFnAdapters() {}
 
-  /**
-   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
-   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
-   */
-  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
   /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
@@ -76,126 +61,6 @@ public class DoFnAdapters {
     }
   }
 
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.ProcessContext c,
-      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
-    return fn.new ProcessContext() {
-      @Override
-      public InputT element() {
-        return c.element();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-
-      @Override
-      public Instant timestamp() {
-        return c.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return extra.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return c.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return extra.windowingInternals();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.Context c) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /**
-   * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
-   * returns {@code null}.
-   */
-  @Nullable
-  public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
-    } else {
-      return null;
-    }
-  }
-
   /**
    * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
    * OldDoFn}.
@@ -238,21 +103,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return fn.getAggregators();
-    }
-
-    @Override
     public Duration getAllowedTimestampSkew() {
       return fn.getAllowedTimestampSkew();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 2d2c1fd..0aef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -71,21 +70,6 @@ import org.joda.time.Instant;
  */
 @Deprecated
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
-  public DoFn<InputT, OutputT> toDoFn() {
-    DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
-    if (doFn != null) {
-      return doFn;
-    }
-    if (this instanceof RequiresWindowAccess) {
-      // No parameters as it just accesses `this`
-      return new AdaptedRequiresWindowAccessDoFn();
-    } else {
-      // No parameters as it just accesses `this`
-      return new AdaptedDoFn();
-    }
-  }
-
   /**
    * Information accessible to all methods in this {@code OldDoFn}.
    * Used primarily to output elements.
@@ -334,7 +318,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     this(new HashMap<String, DelegatingAggregator<?, ?>>());
   }
 
-  OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
     this.aggregators = aggregators;
   }
 
@@ -419,32 +403,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code OldDoFn} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code OldDoFn} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code OldDoFn} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<OutputT>} for the output
-   * {@code PCollection<OutputT>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
    * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
@@ -504,255 +462,4 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   Collection<Aggregator<?, ?>> getAggregators() {
     return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
   }
-
-  /**
-   * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedContext extends Context {
-
-    private final DoFn<InputT, OutputT>.Context newContext;
-
-    public AdaptedContext(
-        DoFn<InputT, OutputT>.Context newContext) {
-      this.newContext = newContext;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedProcessContext extends ProcessContext {
-
-    private final DoFn<InputT, OutputT>.ProcessContext newContext;
-
-    public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
-      this.newContext = newContext;
-    }
-
-    @Override
-    public InputT element() {
-      return newContext.element();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return newContext.sideInput(view);
-    }
-
-    @Override
-    public Instant timestamp() {
-      return newContext.timestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return newContext.pane();
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  private class AdaptedDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return OldDoFn.this.getAggregators();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} that implements
-   * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
-
-    private final BoundedWindow window;
-
-    public AdaptedRequiresWindowAccessProcessContext(
-        DoFn<InputT, OutputT>.ProcessContext newContext,
-        BoundedWindow window) {
-      super(newContext);
-      this.window = window;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return window;
-    }
-  }
-
-  private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      OldDoFn.this.processElement(
-          OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 50a7082..b141d51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,13 +18,7 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.util.UserCodeException;
 
 /** Static utilities for working with {@link DoFnInvoker}. */
 public class DoFnInvokers {
@@ -42,137 +36,22 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  private DoFnInvokers() {}
-
   /**
-   * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link
-   * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an
-   * {@link Object} and then pass it to this method, so there is no need to statically specify what
-   * sort of object it is.
+   * Temporarily retained for compatibility with Dataflow worker.
+   * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
    *
-   * @deprecated this is to be used only as a migration path for decoupling upgrades
+   * @deprecated Use {@link #invokerFor(DoFn)}.
    */
+  @SuppressWarnings("unchecked")
   @Deprecated
-  public static DoFnInvoker<?, ?> invokerFor(Serializable deserializedFn) {
+  public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
+      Serializable deserializedFn) {
     if (deserializedFn instanceof DoFn) {
-      return invokerFor((DoFn<?, ?>) deserializedFn);
-    } else if (deserializedFn instanceof OldDoFn) {
-      return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
-    } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Cannot create a %s for %s; it should be either a %s or an %s.",
-              DoFnInvoker.class.getSimpleName(),
-              deserializedFn.toString(),
-              DoFn.class.getSimpleName(),
-              OldDoFn.class.getSimpleName()));
+      return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
     }
+    throw new UnsupportedOperationException(
+        "Only DoFn supported, was: " + deserializedFn.getClass());
   }
 
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers();
-
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated
-  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(Object deserializedFn) {
-    return (DoFnInvoker<InputT, OutputT>) DoFnInvokers.invokerFor((Serializable) deserializedFn);
-  }
-
-
-  static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> {
-
-    private final OldDoFn<InputT, OutputT> fn;
-
-    public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    @Override
-    public DoFn.ProcessContinuation invokeProcessElement(
-        ArgumentProvider<InputT, OutputT> extra) {
-      // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly
-      DoFn<InputT, OutputT>.ProcessContext newCtx =
-          extra.processContext(new DoFn<InputT, OutputT>() {});
-      OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
-          DoFnAdapters.adaptProcessContext(fn, newCtx, extra);
-      try {
-        fn.processElement(oldCtx);
-        return DoFn.ProcessContinuation.stop();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments) {
-      throw new UnsupportedOperationException(
-          String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public void invokeStartBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
-      try {
-        fn.startBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeFinishBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
-      try {
-        fn.finishBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeSetup() {
-      try {
-        fn.setup();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeTeardown() {
-      try {
-        fn.teardown();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
-        CoderRegistry coderRegistry) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> void invokeSplitRestriction(
-        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-        TrackerT invokeNewTracker(RestrictionT restriction) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT> getFn() {
-      throw new UnsupportedOperationException("getFn is not supported for OldDoFn");
-    }
-  }
+  private DoFnInvokers() {}
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 456a6eb..55b8e7e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -738,39 +737,4 @@ public class DoFnInvokersTest {
     invoker.invokeOnTimer(timerId, mockArgumentProvider);
     assertThat(fn.window, equalTo(testWindow));
   }
-
-  private class OldDoFnIdentity extends OldDoFn<String, String> {
-    public void processElement(ProcessContext c) {}
-  }
-
-  @Test
-  public void testOldDoFnProcessElement() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
-        .invokeProcessElement(mockArgumentProvider);
-    verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
-  }
-
-  @Test
-  public void testOldDoFnStartBundle() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext);
-    verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnFinishBundle() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext);
-    verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnSetup() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
-    verify(mockOldDoFn).setup();
-  }
-
-  @Test
-  public void testOldDoFnTeardown() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
-    verify(mockOldDoFn).teardown();
-  }
 }


[3/7] beam git commit: Moves DoFnAdapters to runners-core

Posted by ke...@apache.org.
Moves DoFnAdapters to runners-core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/149d52b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/149d52b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/149d52b5

Branch: refs/heads/master
Commit: 149d52b56787bf3620db6b3adbad373366074a5d
Parents: 50979f7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:28:16 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 12 12:57:14 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apache/beam/runners/core/DoFnAdapters.java  | 344 +++++++++++++++++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../sdk/transforms/AggregatorRetriever.java     |  13 +-
 .../beam/sdk/transforms/DoFnAdapters.java       | 340 ------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   2 +-
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |   2 +-
 14 files changed, 367 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index 33b9269..ef049e1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -22,8 +22,8 @@ import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 49ec1c8..173434f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c41cd45..1a3387c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -38,6 +38,7 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
@@ -49,7 +50,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
new file mode 100644
index 0000000..0f5624f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AggregatorRetriever;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ *     DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+  /** Should not be instantiated. */
+  private DoFnAdapters() {}
+
+  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
+    if (signature.processElement().observesWindow()) {
+      return new WindowDoFnAdapter<>(fn);
+    } else {
+      return new SimpleDoFnAdapter<>(fn);
+    }
+  }
+
+  /**
+   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+   * OldDoFn}.
+   */
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+    private final DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(AggregatorRetriever.getDelegatingAggregators(fn));
+      this.fn = fn;
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      this.invoker.invokeSetup();
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      fn.prepareForProcessing();
+      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void teardown() throws Exception {
+      this.invoker.invokeTeardown();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+      invoker.invokeProcessElement(adapter);
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(fn);
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+  }
+
+  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+      implements OldDoFn.RequiresWindowAccess {
+
+    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn);
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
+   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+   * unavailable.
+   */
+  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.Context context;
+
+    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+      fn.super();
+      this.context = context;
+      super.setupDelegateAggregators();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get the window in processElement; elsewhere there is no defined window.");
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Can only get a ProcessContext in processElement");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
+   */
+  private static class ProcessContextAdapter<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+    private ProcessContextAdapter(
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return context.sideInput(view);
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public InputT element() {
+      return context.element();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return context.timestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return context.pane();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return context.window();
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1ff0212..9808e56 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -330,7 +330,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
       return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
@@ -512,7 +512,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     }
 
     @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+    public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
         createAggregatorInternal(
             String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
       return context.createAggregatorInternal(name, combiner);

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 97b67c6..ef01106 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ed200d5..2a4a68e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 7f6a436..a97bd46 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 6afca38..53b9803 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
   public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
 
   @Override
-  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+  public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
     @SuppressWarnings("unchecked")
     SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 057a3e7..95f2bfd 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index ce47e22..b1d3ead 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -18,9 +18,10 @@
 package org.apache.beam.sdk.transforms;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
  */
 public final class AggregatorRetriever {
   private AggregatorRetriever() {
@@ -28,9 +29,17 @@ public final class AggregatorRetriever {
   }
 
   /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
+   * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
    */
   public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
     return fn.getAggregators();
   }
+
+  /**
+   * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link
+   * DoFn}.
+   */
+  public static Map<String, DelegatingAggregator<?, ?>> getDelegatingAggregators(DoFn<?, ?> fn) {
+    return fn.aggregators;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
deleted file mode 100644
index 0a71faa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- *     DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
-  /** Should not be instantiated. */
-  private DoFnAdapters() {}
-
-  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
-    if (signature.processElement().observesWindow()) {
-      return new WindowDoFnAdapter<>(fn);
-    } else {
-      return new SimpleDoFnAdapter<>(fn);
-    }
-  }
-
-  /**
-   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
-   * OldDoFn}.
-   */
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn.aggregators);
-      this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      this.invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      fn.prepareForProcessing();
-      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void teardown() throws Exception {
-      this.invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter);
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-  }
-
-  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
-  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
-      implements OldDoFn.RequiresWindowAccess {
-
-    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
-   * unavailable.
-   */
-  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no defined window.");
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
-   */
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 0aef552..7b04533 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -192,7 +192,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
      *         context
      */
     @Experimental(Kind.AGGREGATOR)
-    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index 504480b..0db130d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -63,7 +63,7 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
         Instant timestamp) {
     }
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       return null;
     }


[2/7] beam git commit: Removes ArgumentProvider.windowingInternals

Posted by ke...@apache.org.
Removes ArgumentProvider.windowingInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50979f72
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50979f72
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50979f72

Branch: refs/heads/master
Commit: 50979f7262203987ef3ec4a3fbfeeb3f4ae769e7
Parents: ad5eb06
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:23:15 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 12 12:56:27 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 63 --------------------
 .../beam/runners/core/SplittableParDo.java      |  7 ---
 .../beam/sdk/transforms/DoFnAdapters.java       | 14 -----
 .../apache/beam/sdk/transforms/DoFnTester.java  |  7 ---
 .../sdk/transforms/reflect/DoFnInvoker.java     | 20 -------
 .../transforms/reflect/DoFnInvokersTest.java    |  6 --
 6 files changed, 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index b42c57d..df5f3f6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -56,10 +56,8 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -451,11 +449,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       throw new UnsupportedOperationException(
           "Cannot access RestrictionTracker outside of @ProcessElement method.");
@@ -670,57 +663,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         throw new RuntimeException(e);
       }
     }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return stepContext.stateInternals();
-        }
-
-        @Override
-        public void outputWindowedValue(
-            OutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException("A DoFn cannot output to a different window");
-        }
-
-        @Override
-        public <SideOutputT> void sideOutputWindowedValue(
-            TupleTag<SideOutputT> tag,
-            SideOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "A DoFn cannot side output to a different window");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          return context.sideInput(view, sideInputWindow);
-        }
-      };
-    }
-
   }
 
   /**
@@ -871,11 +813,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
     }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
-    }
   }
 
   private static class TimerInternalsTimer implements Timer {

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index e6a2466..f8d12ec 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -685,12 +684,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       }
 
       @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
-        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
-      }
-
-      @Override
       public TrackerT restrictionTracker() {
         return tracker;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index d1c40a6..0a71faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -202,14 +201,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get WindowingInternals in processElement");
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }
@@ -322,11 +313,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 2d8684a..b2c3fd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -328,12 +327,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public WindowingInternals<InputT, OutputT> windowingInternals() {
-              throw new UnsupportedOperationException(
-                  "Not expected to access WindowingInternals from a new DoFn");
-            }
-
-            @Override
             public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 97ac9d3..354578e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 
 /**
@@ -122,19 +120,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     OutputReceiver<OutputT> outputReceiver();
 
     /**
-     * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so
-     * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
-     *
-     * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}.
-     *
-     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state
-     *     and timers, they will need to wait for the arrival of those features. Do not introduce
-     *     new uses of this method.
-     */
-    @Deprecated
-    WindowingInternals<InputT, OutputT> windowingInternals();
-
-    /**
      * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
      */
@@ -180,11 +165,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return null;
-    }
-
-    @Override
     public State state(String stateId) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 55b8e7e..4c6bee1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -51,7 +50,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -77,18 +75,14 @@ public class DoFnInvokersTest {
   @Mock private IntervalWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
-  @Mock private WindowingInternals<String, String> mockWindowingInternals;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
 
-  @Mock private OldDoFn<String, String> mockOldDoFn;
-
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
     when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
     when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
-    when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals);
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 


[5/7] beam git commit: Removes some OldDoFn code from DoFnRunners

Posted by ke...@apache.org.
Removes some OldDoFn code from DoFnRunners

DoFnRunners.createDefault() can be replaced with simpleRunner()
at the existing call sites, since it is never called with a
ReduceFnExecutor at those call sites.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b26ec89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b26ec89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b26ec89

Branch: refs/heads/master
Commit: 2b26ec8934725a600954ced9c4063766a582396a
Parents: 149d52b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jan 12 13:10:40 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apache/beam/runners/core/DoFnRunners.java   | 137 +------------------
 .../beam/runners/direct/ParDoEvaluator.java     |   9 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |  62 ++++-----
 .../runners/spark/translation/DoFnFunction.java |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |   9 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    |  17 +--
 8 files changed, 55 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1a3387c..de4c15d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -305,7 +305,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       sideOutputPortMapping.put(sideOutputTags.get(i), port);
     }
 
-    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         pipelineOptions.get(),
         doFn,
         sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 820bfcd..2f3e93c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -18,9 +18,7 @@
 package org.apache.beam.runners.core;
 
 import java.util.List;
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -53,7 +51,7 @@ public class DoFnRunners {
    * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key
    * partitioning needed, etc.
    */
-  static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
       PipelineOptions options,
       DoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
@@ -119,137 +117,4 @@ public class DoFnRunners {
         stepContext.timerInternals(),
         droppedDueToLatenessAggregator);
   }
-
-  /**
-   * Creates a {@link DoFnRunner} for the provided {@link DoFn}.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      DoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
-    return simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-  }
-
-  /**
-   * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}.
-   *
-   * <p>In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized
-   * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special
-   * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts
-   * dropped elements.
-   *
-   * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn}
-   */
-  @Deprecated
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-
-    DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-
-    if (!(doFn instanceof ReduceFnExecutor)) {
-      return doFnRunner;
-    } else {
-      // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped
-      // elements and we also learn that for some K and V,
-      //   InputT = KeyedWorkItem<K, V>
-      //   OutputT = KV<K, V>
-
-      Aggregator<Long, Long> droppedDueToLatenessAggregator =
-          ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator();
-
-      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
-      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
-          (DoFnRunner) doFnRunner,
-          stepContext,
-          (WindowingStrategy) windowingStrategy,
-          droppedDueToLatenessAggregator);
-
-      return runner;
-    }
-  }
-
-  /**
-   * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or
-   * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such
-   * class, but merely deserialize a payload and pass it to this method.
-   *
-   * @deprecated for migration purposes only for services where users may still submit either {@link
-   *     OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the
-   *     variant for that instead.
-   */
-  @Deprecated
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      Object deserializedFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    if (deserializedFn instanceof DoFn) {
-      return createDefault(
-          options,
-          (DoFn) deserializedFn,
-          sideInputReader,
-          outputManager,
-          mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          windowingStrategy);
-    } else if (deserializedFn instanceof OldDoFn) {
-      return createDefault(
-          options,
-          (OldDoFn) deserializedFn,
-          sideInputReader,
-          outputManager,
-          mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          windowingStrategy);
-    } else {
-      throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s",
-          DoFnRunner.class.getSimpleName(),
-          deserializedFn,
-          deserializedFn.getClass()));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index e146470..97d5360 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -47,7 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
       WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
-      Serializable fn, // may be OldDoFn or DoFn
+      DoFn<InputT, OutputT> fn,
       StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
@@ -72,8 +72,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
 
     ReadyCheckingSideInputReader sideInputReader =
         evaluationContext.createSideInputReader(sideInputs);
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> underlying =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             evaluationContext.getPipelineOptions(),
             fn,
             sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 95f2bfd..90cdf4c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -244,7 +244,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       sideInputReader = sideInputHandler;
     }
 
-    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
+    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
         oldDoFn,
         sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b84def8..0c5be90 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
@@ -29,14 +27,13 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Wrapper class holding the necessary information to serialize a {@link OldDoFn}
- * or {@link DoFn}.
+ * Wrapper class holding the necessary information to serialize a {@link DoFn}.
  *
- * @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
- * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
+ * @param <InputT> the type of the (main) input elements of the {@link DoFn}
+ * @param <OutputT> the type of the (main) output elements of the {@link DoFn}
  */
 public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final Serializable doFn;
+  private final DoFn<InputT, OutputT> doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final Coder<InputT> inputCoder;
@@ -48,17 +45,37 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
    * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
    */
   public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
+    return new DoFnInfo<>(
+        doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+  }
+
+  /** TODO: remove this when Dataflow worker uses the DoFn overload. */
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
       Serializable doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Iterable<PCollectionView<?>> sideInputViews,
       Coder<InputT> inputCoder,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
-    return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+    return forFn(
+        (DoFn<InputT, OutputT>) doFn,
+        windowingStrategy,
+        sideInputViews,
+        inputCoder,
+        mainOutput,
+        outputMap);
   }
 
   private DoFnInfo(
-      Serializable doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Iterable<PCollectionView<?>> sideInputViews,
       Coder<InputT> inputCoder,
@@ -72,34 +89,15 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
     this.outputMap = outputMap;
   }
 
-  /**
-   * @deprecated use {@link #forFn}.
-   */
+  /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */
   @Deprecated
-  public DoFnInfo(
-      OldDoFn doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputViews,
-      Coder<InputT> inputCoder,
-      long mainOutput,
-      Map<Long, TupleTag<?>> outputMap) {
-    this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
-  }
-
-  /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */
   public Serializable getFn() {
     return doFn;
   }
 
-  /** @deprecated use {@link #getFn()} */
-  @Deprecated
-  public OldDoFn getDoFn() {
-    checkState(
-        doFn instanceof OldDoFn,
-        "Deprecated %s.getDoFn() called when the payload was actually a new %s",
-        DoFnInfo.class.getSimpleName(),
-        DoFn.class.getSimpleName());
-    return (OldDoFn) doFn;
+  /** Returns the embedded function. */
+  public DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
   }
 
   public WindowingStrategy<?, ?> getWindowingStrategy() {

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index af8e089..bd6cfbe 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -80,18 +80,21 @@ public class DoFnFunction<InputT, OutputT>
       Iterator<WindowedValue<InputT>> iter) throws Exception {
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),
             doFn,
             new SparkSideInputReader(sideInputs),
             outputManager,
-            new TupleTag<OutputT>() {},
+            new TupleTag<OutputT>() {
+            },
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
             new SparkAggregators.Factory(runtimeContext, accumulator),
-            windowingStrategy
-        );
+            windowingStrategy);
 
     return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 0f9417a..cceffc8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
 
@@ -88,8 +87,11 @@ public class MultiDoFnFunction<InputT, OutputT>
       Iterator<WindowedValue<InputT>> iter) throws Exception {
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),
             doFn,
             new SparkSideInputReader(sideInputs),
@@ -98,8 +100,7 @@ public class MultiDoFnFunction<InputT, OutputT>
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
             new SparkAggregators.Factory(runtimeContext, accumulator),
-            windowingStrategy
-        );
+            windowingStrategy);
 
     return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index b141d51..33c5a6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -36,21 +36,12 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  /**
-   * Temporarily retained for compatibility with Dataflow worker.
-   * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
-   *
-   * @deprecated Use {@link #invokerFor(DoFn)}.
-   */
-  @SuppressWarnings("unchecked")
+  /** TODO: remove this when Dataflow worker uses the DoFn overload. */
   @Deprecated
+  @SuppressWarnings({"unchecked"})
   public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
-      Serializable deserializedFn) {
-    if (deserializedFn instanceof DoFn) {
-      return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
-    }
-    throw new UnsupportedOperationException(
-        "Only DoFn supported, was: " + deserializedFn.getClass());
+      Serializable fn) {
+    return invokerFor((DoFn) fn);
   }
 
   private DoFnInvokers() {}


[7/7] beam git commit: This closes #1773: Removes or moves away all OldDoFn code from SDK except OldDoFn itself

Posted by ke...@apache.org.
This closes #1773: Removes or moves away all OldDoFn code from SDK except OldDoFn itself

  Points Dataflow runner to updated worker images
  Moves PerKeyCombineFnRunners to Flink runner
  Removes some OldDoFn code from DoFnRunners
  Moves DoFnAdapters to runners-core
  Removes ArgumentProvider.windowingInternals
  Removes code for wrapping DoFn as an OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a91571ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a91571ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a91571ef

Branch: refs/heads/master
Commit: a91571ef9eea8f2341e3906d42f963eb278060e3
Parents: eaf4450 b17e5b0
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jan 17 09:30:24 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 17 09:30:24 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   4 +-
 .../apache/beam/runners/core/DoFnAdapters.java  | 344 +++++++++++++
 .../apache/beam/runners/core/DoFnRunners.java   | 137 +----
 .../runners/core/PerKeyCombineFnRunner.java     |  25 -
 .../runners/core/PerKeyCombineFnRunners.java    | 262 ----------
 .../beam/runners/core/SimpleDoFnRunner.java     |  63 ---
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   4 +-
 .../beam/runners/core/SplittableParDo.java      |   7 -
 .../core/GroupAlsoByWindowsProperties.java      |   2 +-
 .../beam/runners/direct/ParDoEvaluator.java     |   9 +-
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 +++++++++
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   4 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |  62 ++-
 .../beam/runners/dataflow/dataflow.properties   |   4 +-
 .../runners/spark/translation/DoFnFunction.java |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |   9 +-
 .../sdk/transforms/AggregatorRetriever.java     |  13 +-
 .../beam/sdk/transforms/DoFnAdapters.java       | 504 -------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  |   7 -
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 297 +----------
 .../sdk/transforms/reflect/DoFnInvoker.java     |  20 -
 .../sdk/transforms/reflect/DoFnInvokers.java    | 142 +-----
 .../beam/sdk/util/CombineContextFactory.java    |  18 -
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |   2 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  42 --
 35 files changed, 671 insertions(+), 1580 deletions(-)
----------------------------------------------------------------------



[4/7] beam git commit: Moves PerKeyCombineFnRunners to Flink runner

Posted by ke...@apache.org.
Moves PerKeyCombineFnRunners to Flink runner

Flink is its only user. This removes the only remaining
mentions of OldDoFn in the SDK that are not OldDoFn itself.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e382c401
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e382c401
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e382c401

Branch: refs/heads/master
Commit: e382c40187754ad4f3c20565675cb3f131528070
Parents: 2b26ec8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jan 12 13:17:11 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunner.java     |  25 --
 .../runners/core/PerKeyCombineFnRunners.java    | 262 -------------------
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 +++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../beam/sdk/util/CombineContextFactory.java    |  18 --
 9 files changed, 244 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
index a927ecd..4550273 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
@@ -75,31 +75,6 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Seria
    */
   OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
 
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output
-   * in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 34d711b..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.apply(key, inputs);
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      AccumT accum = keyedCombineFn.createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = keyedCombineFn.addInput(key, accum, input);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.apply(key, inputs,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
-      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
-      for (InputT input : inputs) {
-        accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..f672578
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+    }
+  }
+
+  /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
+  private static CombineWithContext.Context createFromProcessContext(
+      final OldDoFn<?, ?>.ProcessContext c) {
+    return new CombineWithContext.Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+    };
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFn;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFnWithContext;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, value,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.mergeAccumulators(
+          key, accumulators, createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, input,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 041d0e8..6412e63 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index fef7921..1456eea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 59163e9..2f56fac 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 8b6ec3a..627cfa6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index fb5c90c..de0d416 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index 149d276..a983057 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -49,23 +48,6 @@ public class CombineContextFactory {
   }
 
   /**
-   * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}.
-   */
-  public static Context createFromProcessContext(final OldDoFn<?, ?>.ProcessContext c) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
    * Returns a {@code Combine.Context} that wraps a {@link StateContext}.
    */
   public static Context createFromStateContext(final StateContext<?> c) {


[6/7] beam git commit: Points Dataflow runner to updated worker images

Posted by ke...@apache.org.
Points Dataflow runner to updated worker images


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b17e5b0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b17e5b0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b17e5b0b

Branch: refs/heads/master
Commit: b17e5b0b768f8338f98c6e6f8c90c448bf460b65
Parents: e382c40
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jan 12 19:13:33 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/dataflow.properties         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b17e5b0b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 77345d2..bf73f89 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170106
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170112
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170106
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170112