You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 07:10:43 UTC

[3/4] beam git commit: Remove Aggregators from StatefulDoFn runner

Remove Aggregators from StatefulDoFn runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdbff494
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdbff494
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdbff494

Branch: refs/heads/master
Commit: fdbff494f8face174ab3a4e5005dcf5744889121
Parents: e92ead5
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 09:43:41 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 -
 .../apache/beam/runners/core/DoFnRunners.java   |  11 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  12 --
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   7 -
 .../beam/runners/core/StatefulDoFnRunner.java   |  12 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |  54 ++------
 .../runners/direct/AggregatorContainerTest.java | 137 -------------------
 .../runners/direct/EvaluationContextTest.java   |  33 -----
 .../wrappers/streaming/DoFnOperator.java        |   2 -
 9 files changed, 17 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9b5a75c..b66d818 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -369,8 +369,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
           doFn,
           doFnRunner,
-          stepContext,
-          null,
           windowingStrategy,
           cleanupTimer,
           stateCleaner);

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 4384b39..26e57f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -24,9 +24,7 @@ import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -132,21 +130,14 @@ public class DoFnRunners {
       DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
           DoFn<InputT, OutputT> fn,
           DoFnRunner<InputT, OutputT> doFnRunner,
-          StepContext stepContext,
-          AggregatorFactory aggregatorFactory,
           WindowingStrategy<?, ?> windowingStrategy,
           CleanupTimer cleanupTimer,
           StateCleaner<W> stateCleaner) {
-    Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
-        fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
-        Sum.ofLongs());
-
     return new StatefulDoFnRunner<>(
         doFnRunner,
         windowingStrategy,
         cleanupTimer,
-        stateCleaner,
-        droppedDueToLateness);
+        stateCleaner);
   }
 
   public static <InputT, OutputT, RestrictionT>

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 05572ea..651458f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,12 +42,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
   }
 
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
@@ -99,8 +91,4 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
         (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
     return asFn;
   }
-
-  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
-    return droppedDueToLateness;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 7e96136..2bd9ee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -38,9 +36,4 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
     extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
   public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 4f15822..7a20590 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -19,7 +19,8 @@ package org.apache.beam.runners.core;
 
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -49,7 +50,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
   private final DoFnRunner<InputT, OutputT> doFnRunner;
   private final WindowingStrategy<?, ?> windowingStrategy;
-  private final Aggregator<Long, Long> droppedDueToLateness;
+  private final Counter droppedDueToLateness = Metrics.counter(
+      StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
   private final CleanupTimer cleanupTimer;
   private final StateCleaner stateCleaner;
 
@@ -57,15 +59,13 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<InputT, OutputT> doFnRunner,
       WindowingStrategy<?, ?> windowingStrategy,
       CleanupTimer cleanupTimer,
-      StateCleaner<W> stateCleaner,
-      Aggregator<Long, Long> droppedDueToLateness) {
+      StateCleaner<W> stateCleaner) {
     this.doFnRunner = doFnRunner;
     this.windowingStrategy = windowingStrategy;
     this.cleanupTimer = cleanupTimer;
     this.stateCleaner = stateCleaner;
     WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
     rejectMergingWindowFn(windowFn);
-    this.droppedDueToLateness = droppedDueToLateness;
   }
 
   private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
@@ -91,7 +91,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
       if (isLate(window)) {
         // The element is too late for this window.
-        droppedDueToLateness.addValue(1L);
+        droppedDueToLateness.inc();
         WindowTracing.debug(
             "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
                 + "since too far behind inputWatermark:{}",

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 46cbd7d..aeaa63b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -27,10 +27,10 @@ import java.util.Collections;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -72,8 +72,6 @@ public class StatefulDoFnRunnerTest {
 
   @Mock StepContext mockStepContext;
 
-  private InMemoryLongSumAggregator droppedDueToLateness;
-  private AggregatorFactory aggregatorFactory;
   private InMemoryStateInternals<String> stateInternals;
   private InMemoryTimerInternals timerInternals;
 
@@ -86,16 +84,6 @@ public class StatefulDoFnRunnerTest {
   public void setup() {
     MockitoAnnotations.initMocks(this);
     when(mockStepContext.timerInternals()).thenReturn(timerInternals);
-    droppedDueToLateness = new InMemoryLongSumAggregator("droppedDueToLateness");
-
-    aggregatorFactory = new AggregatorFactory() {
-      @Override
-      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-          Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
-          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-        return (Aggregator<InputT, OutputT>) droppedDueToLateness;
-      }
-    };
 
     stateInternals = new InMemoryStateInternals<>("hello");
     timerInternals = new InMemoryTimerInternals();
@@ -106,6 +94,7 @@ public class StatefulDoFnRunnerTest {
 
   @Test
   public void testLateDropping() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
 
     timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
     timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -115,8 +104,6 @@ public class StatefulDoFnRunnerTest {
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
         getDoFnRunner(fn),
-        mockStepContext,
-        aggregatorFactory,
         WINDOWING_STRATEGY,
         new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
         new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -129,7 +116,12 @@ public class StatefulDoFnRunnerTest {
 
     runner.processElement(
         WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
-    assertEquals(1L, droppedDueToLateness.sum);
+
+
+    long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(StatefulDoFnRunner.class,
+            StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
+    assertEquals(1L, droppedValues);
 
     runner.finishBundle();
   }
@@ -144,8 +136,6 @@ public class StatefulDoFnRunnerTest {
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
         getDoFnRunner(fn),
-        mockStepContext,
-        aggregatorFactory,
         WINDOWING_STRATEGY,
         new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
         new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -247,28 +237,4 @@ public class StatefulDoFnRunnerTest {
       state.write(currentValue + 1);
     }
   };
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public Combine.CombineFn<Long, ?, Long> getCombineFn() {
-      return Sum.ofLongs();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
deleted file mode 100644
index 37524eb..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AggregatorContainer}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorContainerTest {
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-  private final AggregatorContainer container = AggregatorContainer.create();
-
-  private static final String STEP_NAME = "step";
-  private final Class<?> fn = getClass();
-
-  @Mock
-  private StepContext stepContext;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    when(stepContext.getStepName()).thenReturn(STEP_NAME);
-  }
-
-  @Test
-  public void addsAggregatorsOnCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-    mutator.commit();
-
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-
-    mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
-
-    assertThat("Shouldn't update value until commit",
-        (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-    mutator.commit();
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(13));
-  }
-
-  @Test
-  public void failToCreateAfterCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-  }
-
-  @Test
-  public void failToAddValueAfterCommit() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    Aggregator<Integer, ?> aggregator =
-        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    aggregator.addValue(5);
-  }
-
-  @Test
-  public void failToAddValueAfterCommitWithPrevious() {
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
-    mutator.commit();
-
-    mutator = container.createMutator();
-    Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", Sum.ofIntegers());
-    mutator.commit();
-
-    thrown.expect(IllegalStateException.class);
-    aggregator.addValue(5);
-  }
-
-  @Test
-  public void concurrentWrites() throws InterruptedException {
-    ExecutorService executor = Executors.newFixedThreadPool(20);
-    int sum = 0;
-    for (int i = 0; i < 100; i++) {
-      sum += i;
-      final int value = i;
-      final AggregatorContainer.Mutator mutator = container.createMutator();
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          mutator.createAggregatorForDoFn(
-              fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
-          mutator.commit();
-        }
-      });
-    }
-    executor.shutdown();
-    assertThat("Expected all threads to complete after 5 seconds",
-        executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true));
-
-    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(sum));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 35b6709..0c3a8ed 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,38 +241,6 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void handleResultCommitsAggregators() {
-    Class<?> fn = getClass();
-    DirectExecutionContext fooContext =
-        context.getExecutionContext(createdProducer, null);
-    DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
-        "STEP", createdProducer.getTransform().getName());
-    AggregatorContainer container = context.getAggregatorContainer();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
-
-    TransformResult<?> result =
-        StepTransformResult.withoutHold(createdProducer)
-            .withAggregatorChanges(mutator)
-            .build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), result);
-    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
-
-    AggregatorContainer.Mutator mutatorAgain = container.createMutator();
-    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
-
-    TransformResult<?> secondResult =
-        StepTransformResult.withoutHold(downstreamProducer)
-            .withAggregatorChanges(mutatorAgain)
-            .build();
-    context.handleResult(
-        context.createBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        secondResult);
-    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
-  }
-
-  @Test
   public void handleResultStoresState() {
     StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
     DirectExecutionContext fooContext =

http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62d7a9c..54eb770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -319,8 +319,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
           doFn,
           doFnRunner,
-          stepContext,
-          aggregatorFactory,
           windowingStrategy,
           cleanupTimer,
           stateCleaner);