You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 07:10:43 UTC
[3/4] beam git commit: Remove Aggregators from StatefulDoFn runner
Remove Aggregators from StatefulDoFn runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdbff494
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdbff494
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdbff494
Branch: refs/heads/master
Commit: fdbff494f8face174ab3a4e5005dcf5744889121
Parents: e92ead5
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 09:43:41 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 -
.../apache/beam/runners/core/DoFnRunners.java | 11 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 12 --
.../runners/core/GroupAlsoByWindowsDoFn.java | 7 -
.../beam/runners/core/StatefulDoFnRunner.java | 12 +-
.../runners/core/StatefulDoFnRunnerTest.java | 54 ++------
.../runners/direct/AggregatorContainerTest.java | 137 -------------------
.../runners/direct/EvaluationContextTest.java | 33 -----
.../wrappers/streaming/DoFnOperator.java | 2 -
9 files changed, 17 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9b5a75c..b66d818 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -369,8 +369,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
doFn,
doFnRunner,
- stepContext,
- null,
windowingStrategy,
cleanupTimer,
stateCleaner);
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 4384b39..26e57f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -24,9 +24,7 @@ import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
@@ -132,21 +130,14 @@ public class DoFnRunners {
DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
DoFn<InputT, OutputT> fn,
DoFnRunner<InputT, OutputT> doFnRunner,
- StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer cleanupTimer,
StateCleaner<W> stateCleaner) {
- Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
- fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
- Sum.ofLongs());
-
return new StatefulDoFnRunner<>(
doFnRunner,
windowingStrategy,
cleanupTimer,
- stateCleaner,
- droppedDueToLateness);
+ stateCleaner);
}
public static <InputT, OutputT, RestrictionT>
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 05572ea..651458f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,12 +42,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
}
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
@@ -99,8 +91,4 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
(OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
return asFn;
}
-
- public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
- return droppedDueToLateness;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 7e96136..2bd9ee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.core;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
@@ -38,9 +36,4 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 4f15822..7a20590 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -19,7 +19,8 @@ package org.apache.beam.runners.core;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -49,7 +50,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
private final DoFnRunner<InputT, OutputT> doFnRunner;
private final WindowingStrategy<?, ?> windowingStrategy;
- private final Aggregator<Long, Long> droppedDueToLateness;
+ private final Counter droppedDueToLateness = Metrics.counter(
+ StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
private final CleanupTimer cleanupTimer;
private final StateCleaner stateCleaner;
@@ -57,15 +59,13 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
DoFnRunner<InputT, OutputT> doFnRunner,
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer cleanupTimer,
- StateCleaner<W> stateCleaner,
- Aggregator<Long, Long> droppedDueToLateness) {
+ StateCleaner<W> stateCleaner) {
this.doFnRunner = doFnRunner;
this.windowingStrategy = windowingStrategy;
this.cleanupTimer = cleanupTimer;
this.stateCleaner = stateCleaner;
WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
rejectMergingWindowFn(windowFn);
- this.droppedDueToLateness = droppedDueToLateness;
}
private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
@@ -91,7 +91,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
if (isLate(window)) {
// The element is too late for this window.
- droppedDueToLateness.addValue(1L);
+ droppedDueToLateness.inc();
WindowTracing.debug(
"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ "since too far behind inputWatermark:{}",
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 46cbd7d..aeaa63b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -27,10 +27,10 @@ import java.util.Collections;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -72,8 +72,6 @@ public class StatefulDoFnRunnerTest {
@Mock StepContext mockStepContext;
- private InMemoryLongSumAggregator droppedDueToLateness;
- private AggregatorFactory aggregatorFactory;
private InMemoryStateInternals<String> stateInternals;
private InMemoryTimerInternals timerInternals;
@@ -86,16 +84,6 @@ public class StatefulDoFnRunnerTest {
public void setup() {
MockitoAnnotations.initMocks(this);
when(mockStepContext.timerInternals()).thenReturn(timerInternals);
- droppedDueToLateness = new InMemoryLongSumAggregator("droppedDueToLateness");
-
- aggregatorFactory = new AggregatorFactory() {
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
- return (Aggregator<InputT, OutputT>) droppedDueToLateness;
- }
- };
stateInternals = new InMemoryStateInternals<>("hello");
timerInternals = new InMemoryTimerInternals();
@@ -106,6 +94,7 @@ public class StatefulDoFnRunnerTest {
@Test
public void testLateDropping() throws Exception {
+ MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -115,8 +104,6 @@ public class StatefulDoFnRunnerTest {
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
getDoFnRunner(fn),
- mockStepContext,
- aggregatorFactory,
WINDOWING_STRATEGY,
new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -129,7 +116,12 @@ public class StatefulDoFnRunnerTest {
runner.processElement(
WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
- assertEquals(1L, droppedDueToLateness.sum);
+
+
+ long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+ MetricName.named(StatefulDoFnRunner.class,
+ StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
+ assertEquals(1L, droppedValues);
runner.finishBundle();
}
@@ -144,8 +136,6 @@ public class StatefulDoFnRunnerTest {
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
getDoFnRunner(fn),
- mockStepContext,
- aggregatorFactory,
WINDOWING_STRATEGY,
new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -247,28 +237,4 @@ public class StatefulDoFnRunnerTest {
state.write(currentValue + 1);
}
};
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public Combine.CombineFn<Long, ?, Long> getCombineFn() {
- return Sum.ofLongs();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
deleted file mode 100644
index 37524eb..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AggregatorContainer}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorContainerTest {
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
- private final AggregatorContainer container = AggregatorContainer.create();
-
- private static final String STEP_NAME = "step";
- private final Class<?> fn = getClass();
-
- @Mock
- private StepContext stepContext;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(stepContext.getStepName()).thenReturn(STEP_NAME);
- }
-
- @Test
- public void addsAggregatorsOnCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- mutator.commit();
-
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-
- mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
-
- assertThat("Shouldn't update value until commit",
- (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
- mutator.commit();
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(13));
- }
-
- @Test
- public void failToCreateAfterCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- }
-
- @Test
- public void failToAddValueAfterCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- Aggregator<Integer, ?> aggregator =
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- aggregator.addValue(5);
- }
-
- @Test
- public void failToAddValueAfterCommitWithPrevious() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- mutator.commit();
-
- mutator = container.createMutator();
- Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers());
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- aggregator.addValue(5);
- }
-
- @Test
- public void concurrentWrites() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(20);
- int sum = 0;
- for (int i = 0; i < 100; i++) {
- sum += i;
- final int value = i;
- final AggregatorContainer.Mutator mutator = container.createMutator();
- executor.submit(new Runnable() {
- @Override
- public void run() {
- mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
- mutator.commit();
- }
- });
- }
- executor.shutdown();
- assertThat("Expected all threads to complete after 5 seconds",
- executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true));
-
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(sum));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 35b6709..0c3a8ed 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,38 +241,6 @@ public class EvaluationContextTest {
}
@Test
- public void handleResultCommitsAggregators() {
- Class<?> fn = getClass();
- DirectExecutionContext fooContext =
- context.getExecutionContext(createdProducer, null);
- DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
- "STEP", createdProducer.getTransform().getName());
- AggregatorContainer container = context.getAggregatorContainer();
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
-
- TransformResult<?> result =
- StepTransformResult.withoutHold(createdProducer)
- .withAggregatorChanges(mutator)
- .build();
- context.handleResult(null, ImmutableList.<TimerData>of(), result);
- assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
-
- AggregatorContainer.Mutator mutatorAgain = container.createMutator();
- mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
-
- TransformResult<?> secondResult =
- StepTransformResult.withoutHold(downstreamProducer)
- .withAggregatorChanges(mutatorAgain)
- .build();
- context.handleResult(
- context.createBundle(created).commit(Instant.now()),
- ImmutableList.<TimerData>of(),
- secondResult);
- assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
- }
-
- @Test
public void handleResultStoresState() {
StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
DirectExecutionContext fooContext =
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62d7a9c..54eb770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -319,8 +319,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
doFn,
doFnRunner,
- stepContext,
- aggregatorFactory,
windowingStrategy,
cleanupTimer,
stateCleaner);