You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:10:55 UTC

[01/14] incubator-beam git commit: Add adapter from OldDoFn.RequiresWindowAccess to DoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8dfadbf01 -> c390a2a7f


Add adapter from OldDoFn.RequiresWindowAccess to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/164ee56b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/164ee56b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/164ee56b

Branch: refs/heads/master
Commit: 164ee56b41e01c0ee637eff24e23a814b5885e6f
Parents: a9a41eb
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:18 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:50 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 89 +++++++++++++++++---
 1 file changed, 76 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/164ee56b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index e3cfc38..72c2965 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -77,19 +77,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   public DoFn<InputT, OutputT> toDoFn() {
     if (this instanceof RequiresWindowAccess) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Cannot convert %s to %s because it implements %s."
-                  + " Please convert your %s to a %s directly.",
-              getClass(),
-              DoFn.class.getSimpleName(),
-              RequiresWindowAccess.class.getSimpleName(),
-              OldDoFn.class.getSimpleName(),
-              DoFn.class.getSimpleName()));
-    }
-
-    // No parameters as it just accesses `this`
-    return new AdaptedDoFn();
+      // No parameters as it just accesses `this`
+      return new AdaptedRequiresWindowAccessDoFn();
+    } else {
+      // No parameters as it just accesses `this`
+      return new AdaptedDoFn();
+    }
   }
 
   /**
@@ -770,4 +763,74 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
       return OldDoFn.this.getOutputTypeDescriptor();
     }
   }
+
+  /**
+   * A {@link ProcessContext} for an {@link OldDoFn} that implements
+   * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+   */
+  private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
+
+    private final BoundedWindow window;
+
+    public AdaptedRequiresWindowAccessProcessContext(
+        DoFn<InputT, OutputT>.ProcessContext newContext,
+        BoundedWindow window) {
+      super(newContext);
+      this.window = window;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+  }
+
+  private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
+
+    @Setup
+    public void setup() throws Exception {
+      OldDoFn.this.setup();
+    }
+
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      OldDoFn.this.processElement(
+          OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      OldDoFn.this.teardown();
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return OldDoFn.this.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      OldDoFn.this.populateDisplayData(builder);
+    }
+
+    @Override
+    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+      return OldDoFn.this.getInputTypeDescriptor();
+    }
+
+    @Override
+    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return OldDoFn.this.getOutputTypeDescriptor();
+    }
+  }
 }


[05/14] incubator-beam git commit: Add setupDelegatingAggregators for DoFn (for now)

Posted by ke...@apache.org.
Add setupDelegatingAggregators for DoFn (for now)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2e751f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2e751f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2e751f4

Branch: refs/heads/master
Commit: c2e751f49d72968f2478931cdb884fd4af173610
Parents: 08dd149
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:53:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  1 +
 .../org/apache/beam/sdk/transforms/DoFn.java    | 24 ++++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0360bc2..1cf56a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -228,6 +228,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       this.stepContext = stepContext;
       this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
+      super.setupDelegateAggregators();
     }
 
     //////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0531cbb..11ca853 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -214,6 +214,30 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     protected abstract <AggInputT, AggOutputT>
         Aggregator<AggInputT, AggOutputT> createAggregator(
             String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+    /**
+     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+     *
+     * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected final void setupDelegateAggregators() {
+      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+        setupDelegateAggregator(aggregator);
+      }
+
+      aggregatorsAreFinal = true;
+    }
+
+    private <AggInputT, AggOutputT> void setupDelegateAggregator(
+        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+      Aggregator<AggInputT, AggOutputT> delegate = createAggregator(
+          aggregator.getName(), aggregator.getCombineFn());
+
+      aggregator.setDelegate(delegate);
+    }
+
   }
 
   /**


[12/14] incubator-beam git commit: Port direct runner to use new DoFn directly

Posted by ke...@apache.org.
Port direct runner to use new DoFn directly


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1919d8b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1919d8b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1919d8b3

Branch: refs/heads/master
Commit: 1919d8b3a850bd146137652546da851ee461cd28
Parents: f0c8d30
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 20:55:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DoFnLifecycleManager.java    | 42 +++++++++--------
 .../beam/runners/direct/ParDoEvaluator.java     |  3 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  6 +--
 .../direct/ParDoSingleEvaluatorFactory.java     |  5 +-
 ...leManagerRemovingTransformEvaluatorTest.java | 16 +++----
 .../direct/DoFnLifecycleManagerTest.java        | 12 ++---
 .../direct/DoFnLifecycleManagersTest.java       | 48 ++++++++++++++++----
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 11 +++++
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 11 +++++
 .../beam/runners/direct/SplittableDoFnTest.java |  8 +++-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 23 ++++++++--
 11 files changed, 130 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 0e15c18..23460b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -26,7 +26,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,49 +37,49 @@ import org.slf4j.LoggerFactory;
  * Manages {@link DoFn} setup, teardown, and serialization.
  *
  * <p>{@link DoFnLifecycleManager} is similar to a {@link ThreadLocal} storing a {@link DoFn}, but
- * calls the {@link DoFn} {@link Setup} the first time the {@link DoFn} is obtained and {@link
- * Teardown} whenever the {@link DoFn} is removed, and provides a method for clearing all cached
- * {@link DoFn DoFns}.
+ * calls the {@link DoFn} {@link Setup @Setup} method the first time the {@link DoFn} is obtained
+ * and {@link Teardown @Teardown} whenever the {@link DoFn} is removed, and provides a method for
+ * clearing all cached {@link DoFn DoFns}.
  */
 class DoFnLifecycleManager {
   private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class);
 
-  public static DoFnLifecycleManager of(OldDoFn<?, ?> original) {
+  public static DoFnLifecycleManager of(DoFn<?, ?> original) {
     return new DoFnLifecycleManager(original);
   }
 
-  private final LoadingCache<Thread, OldDoFn<?, ?>> outstanding;
+  private final LoadingCache<Thread, DoFn<?, ?>> outstanding;
 
-  private DoFnLifecycleManager(OldDoFn<?, ?> original) {
+  private DoFnLifecycleManager(DoFn<?, ?> original) {
     this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original));
   }
 
-  public OldDoFn<?, ?> get() throws Exception {
+  public DoFn<?, ?> get() throws Exception {
     Thread currentThread = Thread.currentThread();
     return outstanding.get(currentThread);
   }
 
   public void remove() throws Exception {
     Thread currentThread = Thread.currentThread();
-    OldDoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
-    fn.teardown();
+    DoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
+    DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
   }
 
   /**
    * Remove all {@link DoFn DoFns} from this {@link DoFnLifecycleManager}. Returns all exceptions
    * that were thrown while calling the remove methods.
    *
-   * <p>If the returned Collection is nonempty, an exception was thrown from at least one
-   * {@link DoFn#teardown()} method, and the {@link PipelineRunner} should throw an exception.
+   * <p>If the returned Collection is nonempty, an exception was thrown from at least one {@link
+   * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception.
    */
   public Collection<Exception> removeAll() throws Exception {
-    Iterator<OldDoFn<?, ?>> fns = outstanding.asMap().values().iterator();
+    Iterator<DoFn<?, ?>> fns = outstanding.asMap().values().iterator();
     Collection<Exception> thrown = new ArrayList<>();
     while (fns.hasNext()) {
-      OldDoFn<?, ?> fn = fns.next();
+      DoFn<?, ?> fn = fns.next();
       fns.remove();
       try {
-        fn.teardown();
+        DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
       } catch (Exception e) {
         thrown.add(e);
       }
@@ -85,18 +87,18 @@ class DoFnLifecycleManager {
     return thrown;
   }
 
-  private class DeserializingCacheLoader extends CacheLoader<Thread, OldDoFn<?, ?>> {
+  private class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> {
     private final byte[] original;
 
-    public DeserializingCacheLoader(OldDoFn<?, ?> original) {
+    public DeserializingCacheLoader(DoFn<?, ?> original) {
       this.original = SerializableUtils.serializeToByteArray(original);
     }
 
     @Override
-    public OldDoFn<?, ?> load(Thread key) throws Exception {
-      OldDoFn<?, ?> fn = (OldDoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
+    public DoFn<?, ?> load(Thread key) throws Exception {
+      DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
           "DoFn Copy in thread " + key.getName());
-      fn.setup();
+      DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup();
       return fn;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a59fb4d..b524dfa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -45,7 +46,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
       DirectStepContext stepContext,
       CommittedBundle<InputT> inputBundle,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
-      Object fn,
+      Serializable fn, // may be OldDoFn or DoFn
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index d909e8b..02469ff 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.values.PCollection;
@@ -50,7 +50,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
           public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
               throws Exception {
             BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
-            return DoFnLifecycleManager.of(bound.getFn());
+            return DoFnLifecycleManager.of(bound.getNewFn());
           }
         });
   }
@@ -87,7 +87,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
               stepContext,
               inputBundle,
               application,
-              (OldDoFn) fnLocal.get(),
+              (DoFn) fnLocal.get(),
               application.getTransform().getSideInputs(),
               application.getTransform().getMainOutputTag(),
               application.getTransform().getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 1a06ea6..0584e41 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
                   public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
                       throws Exception {
                     Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
-                    return DoFnLifecycleManager.of(bound.getFn());
+                    return DoFnLifecycleManager.of(bound.getNewFn());
                   }
                 });
   }
@@ -92,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
               stepContext,
               inputBundle,
               application,
-              (OldDoFn) fnLocal.get(),
+              fnLocal.get(),
               application.getTransform().getSideInputs(),
               mainOutputTag,
               Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 2e4fee2..9e2732e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -27,7 +27,7 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -50,7 +50,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void delegatesToUnderlying() throws Exception {
     RecordingTransformEvaluator underlying = new RecordingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
     WindowedValue<Object> first = WindowedValue.valueInGlobalWindow(new Object());
@@ -67,7 +67,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
     ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     assertThat(original, not(nullValue()));
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -75,7 +75,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
     try {
       evaluator.processElement(WindowedValue.valueInGlobalWindow(new Object()));
     } catch (Exception e) {
-      assertThat(lifecycleManager.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+      assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
     fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -84,7 +84,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void removesOnExceptionInFinishBundle() throws Exception {
     ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     // the LifecycleManager is set when the evaluator starts
     assertThat(original, not(nullValue()));
     TransformEvaluator<Object> evaluator =
@@ -94,7 +94,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
       evaluator.finishBundle();
     } catch (Exception e) {
       assertThat(lifecycleManager.get(),
-          Matchers.not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+          Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
     fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -134,8 +134,8 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   }
 
 
-  private static class TestFn extends OldDoFn<Object, Object> {
-    @Override
+  private static class TestFn extends DoFn<Object, Object> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
index 1f0af99..aef9d29 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -101,7 +101,7 @@ public class DoFnLifecycleManagerTest {
     assertThat(obtained.setupCalled, is(true));
     assertThat(obtained.teardownCalled, is(true));
 
-    assertThat(mgr.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(obtained)));
+    assertThat(mgr.get(), not(Matchers.<DoFn<?, ?>>theInstance(obtained)));
   }
 
   @Test
@@ -142,11 +142,11 @@ public class DoFnLifecycleManagerTest {
   }
 
 
-  private static class TestFn extends OldDoFn<Object, Object> {
+  private static class TestFn extends DoFn<Object, Object> {
     boolean setupCalled = false;
     boolean teardownCalled = false;
 
-    @Override
+    @Setup
     public void setup() {
       checkState(!setupCalled);
       checkState(!teardownCalled);
@@ -154,11 +154,11 @@ public class DoFnLifecycleManagerTest {
       setupCalled = true;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
 
-    @Override
+    @Teardown
     public void teardown() {
       checkState(setupCalled);
       checkState(!teardownCalled);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
index 39a4a9d..a19ff99 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
@@ -18,12 +18,15 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -51,9 +54,15 @@ public class DoFnLifecycleManagersTest {
     third.get();
 
     final Collection<Matcher<? super Throwable>> suppressions = new ArrayList<>();
-    suppressions.add(new ThrowableMessageMatcher("foo"));
-    suppressions.add(new ThrowableMessageMatcher("bar"));
-    suppressions.add(new ThrowableMessageMatcher("baz"));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("foo"))));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("bar"))));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("baz"))));
 
     thrown.expect(
         new BaseMatcher<Exception>() {
@@ -90,18 +99,18 @@ public class DoFnLifecycleManagersTest {
     DoFnLifecycleManagers.removeAllFromManagers(ImmutableList.of(first, second, third));
   }
 
-  private static class ThrowsInCleanupFn extends OldDoFn<Object, Object> {
+  private static class ThrowsInCleanupFn extends DoFn<Object, Object> {
     private final String message;
 
     private ThrowsInCleanupFn(String message) {
       this.message = message;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
 
-    @Override
+    @Teardown
     public void teardown() throws Exception {
       throw new Exception(message);
     }
@@ -130,9 +139,32 @@ public class DoFnLifecycleManagersTest {
     }
   }
 
+  private static class CausedByMatcher extends BaseMatcher<Throwable> {
+    private final Matcher<Throwable> causeMatcher;
+
+    public CausedByMatcher(
+        Matcher<Throwable> causeMatcher) {
+      this.causeMatcher = causeMatcher;
+    }
 
-  private static class EmptyFn extends OldDoFn<Object, Object> {
     @Override
+    public boolean matches(Object item) {
+      if (!(item instanceof UserCodeException)) {
+        return false;
+      }
+      UserCodeException that = (UserCodeException) item;
+      return causeMatcher.matches(that.getCause());
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("a throwable with a cause ").appendDescriptionOf(causeMatcher);
+    }
+  }
+
+
+  private static class EmptyFn extends DoFn<Object, Object> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 88e1484..8b0070b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -236,6 +237,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
             WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStatePutsStateInResult() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -321,6 +327,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         containsInAnyOrder("foo", "bara", "bazam"));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
     TestPipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 6a02e40..e562b28 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -166,6 +167,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     assertThat(result.getAggregatorChanges(), equalTo(mutator));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStatePutsStateInResult() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -238,6 +244,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
         containsInAnyOrder("foo", "bara", "bazam"));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
     TestPipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
index 84a0cd9..c164ce6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,6 +45,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.MutableDateTime;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -140,6 +140,9 @@ public class SplittableDoFnTest {
     }
   }
 
+  @Ignore(
+      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+          + "It must be implemented as a primitive.")
   @Test
   public void testPairWithIndexBasic() throws ClassNotFoundException {
     Pipeline p = TestPipeline.create();
@@ -164,6 +167,9 @@ public class SplittableDoFnTest {
     p.run();
   }
 
+  @Ignore(
+      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+          + "It must be implemented as a primitive.")
   @Test
   public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index a9f26a4..f16e0b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -73,6 +73,10 @@ import org.joda.time.Instant;
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
 
   public DoFn<InputT, OutputT> toDoFn() {
+    DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
+    if (doFn != null) {
+      return doFn;
+    }
     if (this instanceof RequiresWindowAccess) {
       // No parameters as it just accesses `this`
       return new AdaptedRequiresWindowAccessDoFn();
@@ -553,8 +557,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
     private final DoFn<InputT, OutputT>.ProcessContext newContext;
 
-    public AdaptedProcessContext(
-        DoFn<InputT, OutputT>.ProcessContext newContext) {
+    public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
       this.newContext = newContext;
     }
 
@@ -632,21 +635,31 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   private class AdaptedDoFn extends DoFn<InputT, OutputT> {
 
+    @Setup
+    public void setup() throws Exception {
+      OldDoFn.this.setup();
+    }
+
     @StartBundle
-    public void startBundle(DoFn.Context c) throws Exception {
+    public void startBundle(Context c) throws Exception {
       OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
     }
 
     @ProcessElement
-    public void processElement(DoFn.ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
       OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
     }
 
     @FinishBundle
-    public void finishBundle(DoFn.Context c) throws Exception {
+    public void finishBundle(Context c) throws Exception {
       OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
     }
 
+    @Teardown
+    public void teardown() throws Exception {
+      OldDoFn.this.teardown();
+    }
+
     @Override
     public Duration getAllowedTimestampSkew() {
       return OldDoFn.this.getAllowedTimestampSkew();


[09/14] incubator-beam git commit: Fix binding of aggregator delegation in DoFnAdapters

Posted by ke...@apache.org.
Fix binding of aggregator delegation in DoFnAdapters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ab955d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ab955d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ab955d5

Branch: refs/heads/master
Commit: 2ab955d5501c87adea3fd17ad5dd1ad73be13364
Parents: 4e185d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:44:47 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab955d5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 0b0d207..ca724cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -278,6 +278,7 @@ public class DoFnAdapters {
     private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
       fn.super();
       this.context = context;
+      super.setupDelegateAggregators();
     }
 
     @Override


[08/14] incubator-beam git commit: Fix binding of aggregator creating in OldDoFn

Posted by ke...@apache.org.
Fix binding of aggregator creating in OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e185d0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e185d0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e185d0b

Branch: refs/heads/master
Commit: 4e185d0b0a7ec4c096380a25b9cbe4703621ec6b
Parents: 3094017
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:44:30 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e185d0b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index b269f47..a9f26a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -511,6 +511,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     public AdaptedContext(
         DoFn<InputT, OutputT>.Context newContext) {
       this.newContext = newContext;
+      super.setupDelegateAggregators();
     }
 
     @Override
@@ -541,7 +542,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     @Override
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
+      return newContext.createAggregator(name, combiner);
     }
   }
 
@@ -625,7 +626,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     @Override
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
+      return newContext.createAggregator(name, combiner);
     }
   }
 


[14/14] incubator-beam git commit: This closes #1157

Posted by ke...@apache.org.
This closes #1157


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c390a2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c390a2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c390a2a7

Branch: refs/heads/master
Commit: c390a2a7ff1b3a58de213f85218eef689e712df4
Parents: 8dfadbf 2089c5c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 09:10:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 24 09:10:04 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AggregatorFactory.java    |  39 ++++
 .../beam/runners/core/SimpleDoFnRunner.java     |   8 +-
 .../runners/direct/DoFnLifecycleManager.java    |  42 ++--
 .../beam/runners/direct/ParDoEvaluator.java     |   3 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |   6 +-
 .../direct/ParDoSingleEvaluatorFactory.java     |   5 +-
 ...leManagerRemovingTransformEvaluatorTest.java |  16 +-
 .../direct/DoFnLifecycleManagerTest.java        |  12 +-
 .../direct/DoFnLifecycleManagersTest.java       |  48 ++++-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  11 +
 .../direct/ParDoSingleEvaluatorFactoryTest.java |  11 +
 .../beam/runners/direct/SplittableDoFnTest.java |   8 +-
 .../functions/FlinkProcessContext.java          |  10 +-
 .../apache/beam/sdk/transforms/Aggregator.java  |  11 +-
 .../sdk/transforms/DelegatingAggregator.java    | 125 +++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  55 +++--
 .../beam/sdk/transforms/DoFnAdapters.java       |   1 +
 .../apache/beam/sdk/transforms/DoFnTester.java  |  18 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 214 +++++++++----------
 .../sdk/util/ReifyTimestampAndWindowsDoFn.java  |  16 +-
 .../org/apache/beam/sdk/util/StringUtils.java   |   2 +-
 .../DoFnDelegatingAggregatorTest.java           |   5 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  16 +-
 23 files changed, 463 insertions(+), 219 deletions(-)
----------------------------------------------------------------------



[07/14] incubator-beam git commit: Move shared DelegatingAggregator out of OldDoFn

Posted by ke...@apache.org.
Move shared DelegatingAggregator out of OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/139437bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/139437bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/139437bd

Branch: refs/heads/master
Commit: 139437bdca8872a11f6a87a9f54347985523faf2
Parents: 0d500ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:45:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/DelegatingAggregator.java    | 125 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  30 ++---
 .../org/apache/beam/sdk/transforms/OldDoFn.java |  97 --------------
 .../DoFnDelegatingAggregatorTest.java           |   5 +-
 4 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
new file mode 100644
index 0000000..d92bb71
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
+ *
+ * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline
+ * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it
+ * submits values must be provided by the runner at execution time.
+ *
+ * @param <AggInputT> the type of input element
+ * @param <AggOutputT> the type of output element
+ */
+class DelegatingAggregator<AggInputT, AggOutputT>
+    implements Aggregator<AggInputT, AggOutputT>, Serializable {
+  private final UUID id;
+
+  private final String name;
+
+  private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
+
+  private Aggregator<AggInputT, ?> delegate;
+
+  public DelegatingAggregator(String name,
+      CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+    this.id = UUID.randomUUID();
+    this.name = checkNotNull(name, "name cannot be null");
+    // Safe contravariant cast
+    @SuppressWarnings("unchecked")
+    CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
+        (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
+    this.combineFn = specificCombiner;
+  }
+
+  @Override
+  public void addValue(AggInputT value) {
+    if (delegate == null) {
+      throw new IllegalStateException(
+          String.format(
+              "addValue cannot be called on Aggregator outside of the execution of a %s.",
+              DoFn.class.getSimpleName()));
+    } else {
+      delegate.addValue(value);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
+    return combineFn;
+  }
+
+  /**
+   * Sets the current delegate of the Aggregator.
+   *
+   * @param delegate the delegate to set in this aggregator
+   */
+  public void setDelegate(Aggregator<AggInputT, ?> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("name", name)
+        .add("combineFn", combineFn)
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, name, combineFn.getClass());
+  }
+
+  /**
+   * Indicates whether some other object is "equal to" this one.
+   *
+   * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
+   * CombineFns are the same class, and they have identical IDs.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o == null) {
+      return false;
+    }
+    if (o instanceof DelegatingAggregator) {
+      DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
+      return Objects.equals(this.id, that.id)
+          && Objects.equals(this.name, that.name)
+          && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 8b3aaf8..0531cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -775,31 +775,31 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   }
 
   /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created
-   * during pipeline construction.
+   * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
+   * argument. The name provided must be unique across {@link Aggregator}s created within the {@link
+   * DoFn}. Aggregators can only be created during pipeline construction.
    *
    * @param name the name of the aggregator
    * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this {@link DoFn}
+   * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn}
    * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
+   * @throws IllegalArgumentException if the given name collides with another aggregator in this
+   *     scope
    * @throws IllegalStateException if called during pipeline execution.
    */
-  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+      String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
     checkNotNull(name, "name cannot be null");
     checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
+    checkArgument(
+        !aggregators.containsKey(name),
         "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
+            + " An Aggregator with that name already exists within this scope.",
         name);
-    checkState(!aggregatorsAreFinal,
+    checkState(
+        !aggregatorsAreFinal,
         "Cannot create an aggregator during pipeline execution."
-        + " Aggregators should be registered during pipeline construction.");
+            + " Aggregators should be registered during pipeline construction.");
 
     DelegatingAggregator<AggInputT, AggOutputT> aggregator =
         new DelegatingAggregator<>(name, combiner);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 72c2965..b269f47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -505,100 +502,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   }
 
   /**
-   * An {@link Aggregator} that delegates calls to addValue to another
-   * aggregator.
-   *
-   * @param <AggInputT> the type of input element
-   * @param <AggOutputT> the type of output element
-   */
-  static class DelegatingAggregator<AggInputT, AggOutputT> implements
-      Aggregator<AggInputT, AggOutputT>, Serializable {
-    private final UUID id;
-
-    private final String name;
-
-    private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
-    private Aggregator<AggInputT, ?> delegate;
-
-    public DelegatingAggregator(String name,
-        CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-      this.id = UUID.randomUUID();
-      this.name = checkNotNull(name, "name cannot be null");
-      // Safe contravariant cast
-      @SuppressWarnings("unchecked")
-      CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
-          (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
-      this.combineFn = specificCombiner;
-    }
-
-    @Override
-    public void addValue(AggInputT value) {
-      if (delegate == null) {
-        throw new IllegalStateException(
-            "addValue cannot be called on Aggregator outside of the execution of a OldDoFn.");
-      } else {
-        delegate.addValue(value);
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    /**
-     * Sets the current delegate of the Aggregator.
-     *
-     * @param delegate the delegate to set in this aggregator
-     */
-    public void setDelegate(Aggregator<AggInputT, ?> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("name", name)
-          .add("combineFn", combineFn)
-          .toString();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(id, name, combineFn.getClass());
-    }
-
-    /**
-     * Indicates whether some other object is "equal to" this one.
-     *
-     * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
-     * CombineFns are the same class, and they have identical IDs.
-     */
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (o instanceof DelegatingAggregator) {
-        DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
-        return Objects.equals(this.id, that.id)
-            && Objects.equals(this.name, that.name)
-            && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
-      }
-      return false;
-    }
-  }
-
-  /**
    * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
    */
   private class AdaptedContext extends Context {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index c072fd7..f51a6b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,7 +34,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 /**
- * Tests for {@link OldDoFn.DelegatingAggregator}.
+ * Tests for {@link DelegatingAggregator}.
  */
 @RunWith(JUnit4.class)
 public class DoFnDelegatingAggregatorTest {
@@ -63,7 +62,7 @@ public class DoFnDelegatingAggregatorTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("cannot be called");
-    thrown.expectMessage("OldDoFn");
+    thrown.expectMessage("DoFn");
 
     aggregator.addValue(21.2);
   }


[03/14] incubator-beam git commit: Fix NPE in SimpleDoFnRunner

Posted by ke...@apache.org.
Fix NPE in SimpleDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d500efe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d500efe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d500efe

Branch: refs/heads/master
Commit: 0d500efe3b2d4248e2889dfcf36423aa473ee0a5
Parents: b235041
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:30:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d500efe/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 4eda376..0360bc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collection;
@@ -511,7 +512,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
     @Override
     public BoundedWindow window() {
-      return null;
+      return Iterables.getOnlyElement(windows);
     }
 
     @Override


[06/14] incubator-beam git commit: Move AggregatorFactory to runners-core and deprecate SDK version

Posted by ke...@apache.org.
Move AggregatorFactory to runners-core and deprecate SDK version


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dd1498
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dd1498
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dd1498

Branch: refs/heads/master
Commit: 08dd14981bad95a029be8ac758a6091c55850200
Parents: 139437b
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:49:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/AggregatorFactory.java    | 39 ++++++++++++++++++++
 .../apache/beam/sdk/transforms/Aggregator.java  | 11 ++----
 2 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
new file mode 100644
index 0000000..153d30d
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+
+/**
+ * A factory for creating aggregators.
+ */
+public interface AggregatorFactory {
+  /**
+   * Create an aggregator with the given {@code name} and {@link CombineFn}.
+   *
+   *  <p>This method is called to create an aggregator for a {@link DoFn}. It receives the
+   *  class of the {@link DoFn} being executed and the context of the step it is being
+   *  executed in.
+   */
+  <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+      Class<?> fnClass, ExecutionContext.StepContext stepContext,
+      String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index e8f6247..13bf322 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -73,8 +73,10 @@ public interface Aggregator<InputT, OutputT> {
   CombineFn<InputT, ?, OutputT> getCombineFn();
 
   /**
-   * A factory for creating aggregators.
+   * @deprecated this is for use only by runners and exists only for a migration period. Please
+   * use the identical interface in org.apache.beam.runners.core
    */
+  @Deprecated
   interface AggregatorFactory {
     /**
      * Create an aggregator with the given {@code name} and {@link CombineFn}.
@@ -87,11 +89,4 @@ public interface Aggregator<InputT, OutputT> {
         Class<?> fnClass, ExecutionContext.StepContext stepContext,
         String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
   }
-
-  // TODO: Consider the following additional API conveniences:
-  // - In addition to createAggregator(), consider adding getAggregator() to
-  //   avoid the need to store the aggregator locally in a DoFn, i.e., create
-  //   if not already present.
-  // - Add a shortcut for the most common aggregator:
-  //   c.createAggregator("name", new Sum.SumIntegerFn()).
 }


[11/14] incubator-beam git commit: Fix flipped conditional in SimpleDoFnRunner

Posted by ke...@apache.org.
Fix flipped conditional in SimpleDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0c8d30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0c8d30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0c8d30d

Branch: refs/heads/master
Commit: f0c8d30d61c631fea642becde38b9cc52e873b5e
Parents: 043ebec
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:53:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0c8d30d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1cf56a6..dec9905 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -125,11 +125,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   @Override
   public void processElement(WindowedValue<InputT> compressedElem) {
     if (observesWindow) {
-      invokeProcessElement(compressedElem);
-    } else {
       for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
         invokeProcessElement(elem);
       }
+    } else {
+      invokeProcessElement(compressedElem);
     }
   }
 


[10/14] incubator-beam git commit: Make DoFnTester aggregator initialization idempotent

Posted by ke...@apache.org.
Make DoFnTester aggregator initialization idempotent


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/043ebeca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/043ebeca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/043ebeca

Branch: refs/heads/master
Commit: 043ebecacf7a8e96939b025afa8480c6df2f3b41
Parents: 2ab955d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:35:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java    | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/043ebeca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 302bb02..7995719 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -543,6 +543,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
         final String name,
         final CombineFn<AinT, AccT, AoutT> combiner) {
+
       Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
         @Override
         public void addValue(AinT value) {
@@ -561,7 +562,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
           return combiner;
         }
       };
-      accumulators.put(name, combiner.createAccumulator());
+
+      // Aggregator instantiation is idempotent
+      if (accumulators.containsKey(name)) {
+        Class<?> currentAccumClass = accumulators.get(name).getClass();
+        Class<?> createAccumClass = combiner.createAccumulator().getClass();
+        checkState(
+            currentAccumClass.isAssignableFrom(createAccumClass),
+            "Aggregator %s already initialized with accumulator type %s "
+                + "but was re-initialized with accumulator type %s",
+            name,
+            currentAccumClass,
+            createAccumClass);
+
+      } else {
+        accumulators.put(name, combiner.createAccumulator());
+      }
       return aggregator;
     }
 


[13/14] incubator-beam git commit: Make aggregator registration idempotent in FlinkRunner

Posted by ke...@apache.org.
Make aggregator registration idempotent in FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2089c5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2089c5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2089c5cd

Branch: refs/heads/master
Commit: 2089c5cd2662a2eeea39ac7ebd1bfd8bcdc1aa16
Parents: 1919d8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 21:26:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:26:48 2016 -0700

----------------------------------------------------------------------
 .../flink/translation/functions/FlinkProcessContext.java  | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2089c5cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index fa5eb1a..baf97cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
@@ -316,7 +318,13 @@ class FlinkProcessContext<InputT, OutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
     SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
         new SerializableFnAggregatorWrapper<>(combiner);
-    runtimeContext.addAccumulator(name, wrapper);
+    Accumulator<?, ?> existingAccum =
+        (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+    if (existingAccum != null) {
+      return wrapper;
+    } else {
+      runtimeContext.addAccumulator(name, wrapper);
+    }
     return wrapper;
   }
 }


[02/14] incubator-beam git commit: Port some of ParDoTest to new DoFn

Posted by ke...@apache.org.
Port some of ParDoTest to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30940179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30940179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30940179

Branch: refs/heads/master
Commit: 3094017956913b583a9bd8be5ce685683b591669
Parents: c2e751f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:35:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/DoFn.java   |  7 ++++---
 .../java/org/apache/beam/sdk/util/StringUtils.java  |  2 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java   | 16 +++++++---------
 3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 11ca853..018877f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -216,9 +215,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
             String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
 
     /**
-     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this
+     * context.
      *
-     * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+     * <p>This method should be called by runners before the {@link StartBundle @StartBundle}
+     * method.
      */
     @Experimental(Kind.AGGREGATOR)
     protected final void setupDelegateAggregators() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 4f81eef..1c52c1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -96,7 +96,7 @@ public class StringUtils {
   }
 
   private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "Fn"};
+      new String[]{"OldDoFn", "DoFn", "Fn"};
 
   /**
    * Pattern to match a non-anonymous inner class.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52244a0..d3ea9fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -90,13 +90,11 @@ public class ParDoTest implements Serializable {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
-  private static class PrintingOldDoFn extends OldDoFn<String, String> implements
-      RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
+  private static class PrintingDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       c.output(c.element() + ":" + c.timestamp().getMillis()
-          + ":" + c.window().maxTimestamp().getMillis());
+          + ":" + window.maxTimestamp().getMillis());
     }
   }
 
@@ -848,7 +846,7 @@ public class ParDoTest implements Serializable {
           output5.getName());
     }
 
-    assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
+    assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
 
     assertEquals(
         "ParMultiDo(SideOutputDummy)",
@@ -1381,7 +1379,7 @@ public class ParDoTest implements Serializable {
                     System.out.println("Finish: 3");
                   }
                 }))
-        .apply(ParDo.of(new PrintingOldDoFn()));
+        .apply(ParDo.of(new PrintingDoFn()));
 
     PAssert.that(output).satisfies(new Checker());
 


[04/14] incubator-beam git commit: Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess

Posted by ke...@apache.org.
Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess

This should become either a DoFn or probably more appropriately,
just something internal to runners that actually require it to be
manifested as a DoFn at all.

As an intermediate migration step, this lessens the level to which
it depends on unsupported APIs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2350417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2350417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2350417

Branch: refs/heads/master
Commit: b2350417f73ae6c34f849ff0e93d5bd93df3088d
Parents: 164ee56
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2350417/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index 8f3f540..6da4da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -28,20 +29,13 @@ import org.apache.beam.sdk.values.KV;
  * @param <V> the type of the values of the input {@code PCollection}
  */
 @SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V>
-    extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>>
+    implements RequiresWindowAccess {
   @Override
-  public void processElement(ProcessContext c)
-      throws Exception {
+  public void processElement(ProcessContext c) throws Exception {
     KV<K, V> kv = c.element();
     K key = kv.getKey();
     V value = kv.getValue();
-    c.output(KV.of(
-        key,
-        WindowedValue.of(
-            value,
-            c.timestamp(),
-            c.windowingInternals().windows(),
-            c.pane())));
+    c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane())));
   }
 }