You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 07:10:41 UTC

[1/4] beam git commit: Remove accumulators from DoFn tester.

Repository: beam
Updated Branches:
  refs/heads/master e92ead58c -> fad07f6b0


Remove accumulators from DoFn tester.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/650e8685
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/650e8685
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/650e8685

Branch: refs/heads/master
Commit: 650e86854cb72258de81378a357c93ff887da338
Parents: b20d983
Author: Pablo <pa...@users.noreply.github.com>
Authored: Thu Apr 27 21:53:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java  | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/650e8685/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 12f718b..70fb0ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -675,8 +675,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
       new HashMap<>();
 
-  private Map<String, Object> accumulators;
-
   /** The output tags used by the {@link DoFn} under test. */
   private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
 
@@ -732,6 +730,5 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     fnInvoker = DoFnInvokers.invokerFor(fn);
     fnInvoker.invokeSetup();
     outputs = new HashMap<>();
-    accumulators = new HashMap<>();
   }
 }


[2/4] beam git commit: Removing Aggregator from core runner code

Posted by dh...@apache.org.
Removing Aggregator from core runner code


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b20d9835
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b20d9835
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b20d9835

Branch: refs/heads/master
Commit: b20d983536c707973189b485eabef6aa00e8ce42
Parents: fdbff49
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 16:10:48 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/OldDoFn.java   | 131 -------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   4 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |  19 ---
 .../core/GroupAlsoByWindowsProperties.java      |   7 -
 .../apache/beam/runners/core/NoOpOldDoFn.java   |   7 -
 .../apache/beam/runners/core/OldDoFnTest.java   | 109 ---------------
 .../apache/beam/sdk/transforms/DoFnTester.java  |  11 --
 7 files changed, 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index 323edf9..419c837 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -17,27 +17,13 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -180,46 +166,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
      * @see ParDo.SingleOutput#withOutputTags
      */
     public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
-
-    /**
-     * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
-     * specified name and aggregation logic specified by {@link CombineFn}.
-     *
-     * <p>For internal use only.
-     *
-     * @param name the name of the aggregator
-     * @param combiner the {@link CombineFn} to use in the aggregator
-     * @return an aggregator for the provided name and {@link CombineFn} in this
-     *         context
-     */
-    @Experimental(Kind.AGGREGATOR)
-    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
-    /**
-     * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
-     * usable within this context.
-     *
-     * <p>This method should be called by runners before {@link OldDoFn#startBundle}
-     * is executed.
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected final void setupDelegateAggregators() {
-      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
-        setupDelegateAggregator(aggregator);
-      }
-
-      aggregatorsAreFinal = true;
-    }
-
-    private <AggInputT, AggOutputT> void setupDelegateAggregator(
-        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
-      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
-          aggregator.getName(), aggregator.getCombineFn());
-
-      aggregator.setDelegate(delegate);
-    }
   }
 
   /**
@@ -316,22 +262,8 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   public interface RequiresWindowAccess {}
 
   public OldDoFn() {
-    this(new HashMap<String, DelegatingAggregator<?, ?>>());
-  }
-
-  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
-    this.aggregators = aggregators;
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
   /**
    * Prepares this {@link DoFn} instance for processing bundles.
    *
@@ -400,67 +332,4 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
   }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-
-    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
-      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
-   */
-  Collection<Aggregator<?, ?>> getAggregators() {
-    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a5733da..1865d54 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -133,7 +133,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
             mainOutputTag,
             additionalOutputTags,
             stepContext,
-            aggregatorFactory,
             windowingStrategy.getWindowFn());
   }
 
@@ -240,7 +239,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     final OutputManager outputManager;
     final TupleTag<OutputT> mainOutputTag;
     final StepContext stepContext;
-    final AggregatorFactory aggregatorFactory;
     final WindowFn<?, ?> windowFn;
 
     /**
@@ -257,7 +255,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         TupleTag<OutputT> mainOutputTag,
         List<TupleTag<?>> additionalOutputTags,
         StepContext stepContext,
-        AggregatorFactory aggregatorFactory,
         WindowFn<?, ?> windowFn) {
       fn.super();
       this.options = options;
@@ -273,7 +270,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       }
 
       this.stepContext = stepContext;
-      this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 6320a3a..b8db491 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -29,8 +29,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -168,7 +166,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     final OutputManager outputManager;
     final TupleTag<OutputT> mainOutputTag;
     final StepContext stepContext;
-    final AggregatorFactory aggregatorFactory;
     final WindowFn<?, ?> windowFn;
 
     /**
@@ -200,9 +197,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
       }
 
       this.stepContext = stepContext;
-      this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
-      super.setupDelegateAggregators();
     }
 
     //////////////////////////////////////////////////////////////////////////////
@@ -329,13 +324,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
       checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
       outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
     }
-
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
-      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
-    }
   }
 
   /**
@@ -511,12 +499,5 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
         }
       };
     }
-
-    @Override
-    public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
-        createAggregatorInternal(
-            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 81ac5fa..bc33366 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -37,7 +37,6 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -738,12 +737,6 @@ public class GroupAlsoByWindowsProperties {
       throw new UnsupportedOperationException();
     }
 
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
-
     public List<WindowedValue<KV<K, OutputT>>> getOutput() {
       return output;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
index 2e5cd6d..581c3e0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
@@ -63,10 +61,5 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output,
         Instant timestamp) {
     }
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index d6838e2..f608a81 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -18,19 +18,10 @@
 package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -47,106 +38,6 @@ public class OldDoFnTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testCreateAggregatorWithNullNameThrowsException() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("name cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator(null, Sum.ofLongs());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullCombineFnThrowsException() {
-    CombineFn<Object, Object, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
-    SerializableFunction<Iterable<Object>, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithSameNameThrowsException() {
-    String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Cannot create");
-    thrown.expectMessage(name);
-    thrown.expectMessage("already exists");
-
-    doFn.createAggregator(name, combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
-    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception { }
-    };
-    OldDoFn<String, String>.Context context = createContext(fn);
-    context.setupDelegateAggregators();
-
-    thrown.expect(isA(IllegalStateException.class));
-    fn.createAggregator("anyAggregate", Max.ofIntegers());
-  }
-
-  private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void output(String output) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void outputWithTimestamp(String output, Instant timestamp) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <T> void output(TupleTag<T> tag, T output) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <AggInputT, AggOutputT>
-      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  @Test
   public void testPopulateDisplayDataDefaultBehavior() {
     OldDoFn<String, String> usesDefault =
         new OldDoFn<String, String>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 813975c..12f718b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,7 +33,6 @@ import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -502,16 +501,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return resultElems;
   }
 
-  private <AccumT, AggregateT> AggregateT extractAggregatorValue(
-      String name, CombineFn<?, AccumT, AggregateT> combiner) {
-    @SuppressWarnings("unchecked")
-    AccumT accumulator = (AccumT) accumulators.get(name);
-    if (accumulator == null) {
-      accumulator = combiner.createAccumulator();
-    }
-    return combiner.extractOutput(accumulator);
-  }
-
   private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);


[3/4] beam git commit: Remove Aggregators from StatefulDoFn runner

Posted by dh...@apache.org.
Remove Aggregators from StatefulDoFn runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdbff494
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdbff494
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdbff494

Branch: refs/heads/master
Commit: fdbff494f8face174ab3a4e5005dcf5744889121
Parents: e92ead5
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 09:43:41 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 -
 .../apache/beam/runners/core/DoFnRunners.java   |  11 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  12 --
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   7 -
 .../beam/runners/core/StatefulDoFnRunner.java   |  12 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |  54 ++------
 .../runners/direct/AggregatorContainerTest.java | 137 -------------------
 .../runners/direct/EvaluationContextTest.java   |  33 -----
 .../wrappers/streaming/DoFnOperator.java        |   2 -
 9 files changed, 17 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9b5a75c..b66d818 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -369,8 +369,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
           doFn,
           doFnRunner,
-          stepContext,
-          null,
           windowingStrategy,
           cleanupTimer,
           stateCleaner);

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 4384b39..26e57f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -24,9 +24,7 @@ import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -132,21 +130,14 @@ public class DoFnRunners {
       DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
           DoFn<InputT, OutputT> fn,
           DoFnRunner<InputT, OutputT> doFnRunner,
-          StepContext stepContext,
-          AggregatorFactory aggregatorFactory,
           WindowingStrategy<?, ?> windowingStrategy,
           CleanupTimer cleanupTimer,
           StateCleaner<W> stateCleaner) {
-    Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
-        fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
-        Sum.ofLongs());
-
     return new StatefulDoFnRunner<>(
         doFnRunner,
         windowingStrategy,
         cleanupTimer,
-        stateCleaner,
-        droppedDueToLateness);
+        stateCleaner);
   }
 
   public static <InputT, OutputT, RestrictionT>

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 05572ea..651458f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,12 +42,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
   }
 
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
@@ -99,8 +91,4 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
         (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
     return asFn;
   }
-
-  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
-    return droppedDueToLateness;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 7e96136..2bd9ee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -38,9 +36,4 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
     extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
   public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 4f15822..7a20590 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -19,7 +19,8 @@ package org.apache.beam.runners.core;
 
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -49,7 +50,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
   private final DoFnRunner<InputT, OutputT> doFnRunner;
   private final WindowingStrategy<?, ?> windowingStrategy;
-  private final Aggregator<Long, Long> droppedDueToLateness;
+  private final Counter droppedDueToLateness = Metrics.counter(
+      StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
   private final CleanupTimer cleanupTimer;
   private final StateCleaner stateCleaner;
 
@@ -57,15 +59,13 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<InputT, OutputT> doFnRunner,
       WindowingStrategy<?, ?> windowingStrategy,
       CleanupTimer cleanupTimer,
-      StateCleaner<W> stateCleaner,
-      Aggregator<Long, Long> droppedDueToLateness) {
+      StateCleaner<W> stateCleaner) {
     this.doFnRunner = doFnRunner;
     this.windowingStrategy = windowingStrategy;
     this.cleanupTimer = cleanupTimer;
     this.stateCleaner = stateCleaner;
     WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
     rejectMergingWindowFn(windowFn);
-    this.droppedDueToLateness = droppedDueToLateness;
   }
 
   private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
@@ -91,7 +91,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
       if (isLate(window)) {
         // The element is too late for this window.
-        droppedDueToLateness.addValue(1L);
+        droppedDueToLateness.inc();
         WindowTracing.debug(
             "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
                 + "since too far behind inputWatermark:{}",

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 46cbd7d..aeaa63b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -27,10 +27,10 @@ import java.util.Collections;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -72,8 +72,6 @@ public class StatefulDoFnRunnerTest {
 
   @Mock StepContext mockStepContext;
 
-  private InMemoryLongSumAggregator droppedDueToLateness;
-  private AggregatorFactory aggregatorFactory;
   private InMemoryStateInternals<String> stateInternals;
   private InMemoryTimerInternals timerInternals;
 
@@ -86,16 +84,6 @@ public class StatefulDoFnRunnerTest {
   public void setup() {
     MockitoAnnotations.initMocks(this);
     when(mockStepContext.timerInternals()).thenReturn(timerInternals);
-    droppedDueToLateness = new InMemoryLongSumAggregator("droppedDueToLateness");
-
-    aggregatorFactory = new AggregatorFactory() {
-      @Override
-      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-          Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
-          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-        return (Aggregator<InputT, OutputT>) droppedDueToLateness;
-      }
-    };
 
     stateInternals = new InMemoryStateInternals<>("hello");
     timerInternals = new InMemoryTimerInternals();
@@ -106,6 +94,7 @@ public class StatefulDoFnRunnerTest {
 
   @Test
   public void testLateDropping() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
 
     timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
     timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -115,8 +104,6 @@ public class StatefulDoFnRunnerTest {
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
         getDoFnRunner(fn),
-        mockStepContext,
-        aggregatorFactory,
         WINDOWING_STRATEGY,
         new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
         new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -129,7 +116,12 @@ public class StatefulDoFnRunnerTest {
 
     runner.processElement(
         WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
-    assertEquals(1L, droppedDueToLateness.sum);
+
+
+    long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(StatefulDoFnRunner.class,
+            StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
+    assertEquals(1L, droppedValues);
 
     runner.finishBundle();
   }
@@ -144,8 +136,6 @@ public class StatefulDoFnRunnerTest {
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
         getDoFnRunner(fn),
-        mockStepContext,
-        aggregatorFactory,
         WINDOWING_STRATEGY,
         new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
         new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -247,28 +237,4 @@ public class StatefulDoFnRunnerTest {
       state.write(currentValue + 1);
     }
   };
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public Combine.CombineFn<Long, ?, Long> getCombineFn() {
-      return Sum.ofLongs();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
deleted file mode 100644
index 37524eb..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AggregatorContainer}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorContainerTest {
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-  private final AggregatorContainer container = AggregatorContainer.create();
-
-  private static final String STEP_NAME = "step";
-  private final Class<?> fn = getClass();
-
-  @Mock
-  private StepContext stepContext;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    when(stepContext.getStepName()).thenReturn(STEP_NAME);
-  }
-
-  @Test
-  public void addsAggregatorsOnCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-    mutator.commit();
-
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-
-    mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
-
-    assertThat("Shouldn't update value until commit",
-        (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-    mutator.commit();
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(13));
-  }
-
-  @Test
-  public void failToCreateAfterCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-  }
-
-  @Test
-  public void failToAddValueAfterCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    Aggregator<Integer, ?> aggregator =
-        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    aggregator.addValue(5);
-  }
-
-  @Test
-  public void failToAddValueAfterCommitWithPrevious() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-    mutator.commit();
-
-    mutator = container.createMutator();
-    Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", Sum.ofIntegers());
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    aggregator.addValue(5);
-  }
-
-  @Test
-  public void concurrentWrites() throws InterruptedException {
-    ExecutorService executor = Executors.newFixedThreadPool(20);
-    int sum = 0;
-    for (int i = 0; i < 100; i++) {
-      sum += i;
-      final int value = i;
-      final AggregatorContainer.Mutator mutator = container.createMutator();
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          mutator.createAggregatorForDoFn(
-              fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
-          mutator.commit();
-        }
-      });
-    }
-    executor.shutdown();
-    assertThat("Expected all threads to complete after 5 seconds",
-        executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true));
-
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(sum));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 35b6709..0c3a8ed 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,38 +241,6 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void handleResultCommitsAggregators() {
-    Class<?> fn = getClass();
-    DirectExecutionContext fooContext =
-        context.getExecutionContext(createdProducer, null);
-    DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
-        "STEP", createdProducer.getTransform().getName());
-    AggregatorContainer container = context.getAggregatorContainer();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
-
-    TransformResult<?> result =
-        StepTransformResult.withoutHold(createdProducer)
-            .withAggregatorChanges(mutator)
-            .build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), result);
-    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
-
-    AggregatorContainer.Mutator mutatorAgain = container.createMutator();
-    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
-
-    TransformResult<?> secondResult =
-        StepTransformResult.withoutHold(downstreamProducer)
-            .withAggregatorChanges(mutatorAgain)
-            .build();
-    context.handleResult(
-        context.createBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        secondResult);
-    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
-  }
-
-  @Test
   public void handleResultStoresState() {
     StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
     DirectExecutionContext fooContext =

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62d7a9c..54eb770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -319,8 +319,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
           doFn,
           doFnRunner,
-          stepContext,
-          aggregatorFactory,
           windowingStrategy,
           cleanupTimer,
           stateCleaner);


[4/4] beam git commit: This closes #2744

Posted by dh...@apache.org.
This closes #2744


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fad07f6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fad07f6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fad07f6b

Branch: refs/heads/master
Commit: fad07f6b04f9d9f962607245b997d542330ef422
Parents: e92ead5 650e868
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 00:10:33 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:33 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 -
 .../apache/beam/runners/core/DoFnRunners.java   |  11 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  12 --
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   7 -
 .../org/apache/beam/runners/core/OldDoFn.java   | 131 ------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   4 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |  19 ---
 .../beam/runners/core/StatefulDoFnRunner.java   |  12 +-
 .../core/GroupAlsoByWindowsProperties.java      |   7 -
 .../apache/beam/runners/core/NoOpOldDoFn.java   |   7 -
 .../apache/beam/runners/core/OldDoFnTest.java   | 109 ---------------
 .../runners/core/StatefulDoFnRunnerTest.java    |  54 ++------
 .../runners/direct/AggregatorContainerTest.java | 137 -------------------
 .../runners/direct/EvaluationContextTest.java   |  33 -----
 .../wrappers/streaming/DoFnOperator.java        |   2 -
 .../apache/beam/sdk/transforms/DoFnTester.java  |  14 --
 16 files changed, 17 insertions(+), 544 deletions(-)
----------------------------------------------------------------------