You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 07:10:41 UTC
[1/4] beam git commit: Remove accumulators from DoFn tester.
Repository: beam
Updated Branches:
refs/heads/master e92ead58c -> fad07f6b0
Remove accumulators from DoFn tester.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/650e8685
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/650e8685
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/650e8685
Branch: refs/heads/master
Commit: 650e86854cb72258de81378a357c93ff887da338
Parents: b20d983
Author: Pablo <pa...@users.noreply.github.com>
Authored: Thu Apr 27 21:53:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/650e8685/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 12f718b..70fb0ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -675,8 +675,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
new HashMap<>();
- private Map<String, Object> accumulators;
-
/** The output tags used by the {@link DoFn} under test. */
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
@@ -732,6 +730,5 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
fnInvoker = DoFnInvokers.invokerFor(fn);
fnInvoker.invokeSetup();
outputs = new HashMap<>();
- accumulators = new HashMap<>();
}
}
[2/4] beam git commit: Removing Aggregator from core runner code
Posted by dh...@apache.org.
Removing Aggregator from core runner code
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b20d9835
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b20d9835
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b20d9835
Branch: refs/heads/master
Commit: b20d983536c707973189b485eabef6aa00e8ce42
Parents: fdbff49
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 16:10:48 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/core/OldDoFn.java | 131 -------------------
.../beam/runners/core/SimpleDoFnRunner.java | 4 -
.../beam/runners/core/SimpleOldDoFnRunner.java | 19 ---
.../core/GroupAlsoByWindowsProperties.java | 7 -
.../apache/beam/runners/core/NoOpOldDoFn.java | 7 -
.../apache/beam/runners/core/OldDoFnTest.java | 109 ---------------
.../apache/beam/sdk/transforms/DoFnTester.java | 11 --
7 files changed, 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index 323edf9..419c837 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -17,27 +17,13 @@
*/
package org.apache.beam.runners.core;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -180,46 +166,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
* @see ParDo.SingleOutput#withOutputTags
*/
public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
-
- /**
- * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
- * specified name and aggregation logic specified by {@link CombineFn}.
- *
- * <p>For internal use only.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and {@link CombineFn} in this
- * context
- */
- @Experimental(Kind.AGGREGATOR)
- public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
- /**
- * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
- * usable within this context.
- *
- * <p>This method should be called by runners before {@link OldDoFn#startBundle}
- * is executed.
- */
- @Experimental(Kind.AGGREGATOR)
- protected final void setupDelegateAggregators() {
- for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
- setupDelegateAggregator(aggregator);
- }
-
- aggregatorsAreFinal = true;
- }
-
- private <AggInputT, AggOutputT> void setupDelegateAggregator(
- DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
- Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
- aggregator.getName(), aggregator.getCombineFn());
-
- aggregator.setDelegate(delegate);
- }
}
/**
@@ -316,22 +262,8 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
public interface RequiresWindowAccess {}
public OldDoFn() {
- this(new HashMap<String, DelegatingAggregator<?, ?>>());
- }
-
- public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
- this.aggregators = aggregators;
}
- /////////////////////////////////////////////////////////////////////////////
-
- private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
/**
* Prepares this {@link DoFn} instance for processing bundles.
*
@@ -400,67 +332,4 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
@Override
public void populateDisplayData(DisplayData.Builder builder) {
}
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
-
- checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
- SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
- */
- Collection<Aggregator<?, ?>> getAggregators() {
- return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a5733da..1865d54 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -133,7 +133,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
windowingStrategy.getWindowFn());
}
@@ -240,7 +239,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
final OutputManager outputManager;
final TupleTag<OutputT> mainOutputTag;
final StepContext stepContext;
- final AggregatorFactory aggregatorFactory;
final WindowFn<?, ?> windowFn;
/**
@@ -257,7 +255,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowFn<?, ?> windowFn) {
fn.super();
this.options = options;
@@ -273,7 +270,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
this.stepContext = stepContext;
- this.aggregatorFactory = aggregatorFactory;
this.windowFn = windowFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 6320a3a..b8db491 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -29,8 +29,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -168,7 +166,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
final OutputManager outputManager;
final TupleTag<OutputT> mainOutputTag;
final StepContext stepContext;
- final AggregatorFactory aggregatorFactory;
final WindowFn<?, ?> windowFn;
/**
@@ -200,9 +197,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
this.stepContext = stepContext;
- this.aggregatorFactory = aggregatorFactory;
this.windowFn = windowFn;
- super.setupDelegateAggregators();
}
//////////////////////////////////////////////////////////////////////////////
@@ -329,13 +324,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null");
outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
}
-
- @Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
- return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
- }
}
/**
@@ -511,12 +499,5 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
};
}
-
- @Override
- public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
- createAggregatorInternal(
- String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 81ac5fa..bc33366 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -37,7 +37,6 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -738,12 +737,6 @@ public class GroupAlsoByWindowsProperties {
throw new UnsupportedOperationException();
}
- @Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- throw new UnsupportedOperationException();
- }
-
public List<WindowedValue<KV<K, OutputT>>> getOutput() {
return output;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
index 2e5cd6d..581c3e0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.core;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
@@ -63,10 +61,5 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
public <T> void outputWithTimestamp(TupleTag<T> tag, T output,
Instant timestamp) {
}
- @Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return null;
- }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index d6838e2..f608a81 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -18,19 +18,10 @@
package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -47,106 +38,6 @@ public class OldDoFnTest implements Serializable {
public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testCreateAggregatorWithNullNameThrowsException() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("name cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator(null, Sum.ofLongs());
- }
-
- @Test
- public void testCreateAggregatorWithNullCombineFnThrowsException() {
- CombineFn<Object, Object, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithNullSerializableFnThrowsException() {
- SerializableFunction<Iterable<Object>, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithSameNameThrowsException() {
- String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Cannot create");
- thrown.expectMessage(name);
- thrown.expectMessage("already exists");
-
- doFn.createAggregator(name, combiner);
- }
-
- @Test
- public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
- OldDoFn<String, String> fn = new OldDoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception { }
- };
- OldDoFn<String, String>.Context context = createContext(fn);
- context.setupDelegateAggregators();
-
- thrown.expect(isA(IllegalStateException.class));
- fn.createAggregator("anyAggregate", Max.ofIntegers());
- }
-
- private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
- return fn.new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void output(String output) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void outputWithTimestamp(String output, Instant timestamp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <AggInputT, AggOutputT>
- Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Test
public void testPopulateDisplayDataDefaultBehavior() {
OldDoFn<String, String> usesDefault =
new OldDoFn<String, String>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b20d9835/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 813975c..12f718b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,7 +33,6 @@ import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -502,16 +501,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return resultElems;
}
- private <AccumT, AggregateT> AggregateT extractAggregatorValue(
- String name, CombineFn<?, AccumT, AggregateT> combiner) {
- @SuppressWarnings("unchecked")
- AccumT accumulator = (AccumT) accumulators.get(name);
- if (accumulator == null) {
- accumulator = combiner.createAccumulator();
- }
- return combiner.extractOutput(accumulator);
- }
-
private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
@SuppressWarnings({"unchecked", "rawtypes"})
List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
[3/4] beam git commit: Remove Aggregators from StatefulDoFn runner
Posted by dh...@apache.org.
Remove Aggregators from StatefulDoFn runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdbff494
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdbff494
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdbff494
Branch: refs/heads/master
Commit: fdbff494f8face174ab3a4e5005dcf5744889121
Parents: e92ead5
Author: Pablo <pa...@google.com>
Authored: Thu Apr 27 09:43:41 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:17 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 -
.../apache/beam/runners/core/DoFnRunners.java | 11 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 12 --
.../runners/core/GroupAlsoByWindowsDoFn.java | 7 -
.../beam/runners/core/StatefulDoFnRunner.java | 12 +-
.../runners/core/StatefulDoFnRunnerTest.java | 54 ++------
.../runners/direct/AggregatorContainerTest.java | 137 -------------------
.../runners/direct/EvaluationContextTest.java | 33 -----
.../wrappers/streaming/DoFnOperator.java | 2 -
9 files changed, 17 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9b5a75c..b66d818 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -369,8 +369,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
doFn,
doFnRunner,
- stepContext,
- null,
windowingStrategy,
cleanupTimer,
stateCleaner);
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 4384b39..26e57f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -24,9 +24,7 @@ import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
@@ -132,21 +130,14 @@ public class DoFnRunners {
DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
DoFn<InputT, OutputT> fn,
DoFnRunner<InputT, OutputT> doFnRunner,
- StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer cleanupTimer,
StateCleaner<W> stateCleaner) {
- Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
- fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
- Sum.ofLongs());
-
return new StatefulDoFnRunner<>(
doFnRunner,
windowingStrategy,
cleanupTimer,
- stateCleaner,
- droppedDueToLateness);
+ stateCleaner);
}
public static <InputT, OutputT, RestrictionT>
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 05572ea..651458f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,12 +42,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
}
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
-
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
@@ -99,8 +91,4 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
(OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
return asFn;
}
-
- public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
- return droppedDueToLateness;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 7e96136..2bd9ee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.core;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
@@ -38,9 +36,4 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 4f15822..7a20590 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -19,7 +19,8 @@ package org.apache.beam.runners.core;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -49,7 +50,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
private final DoFnRunner<InputT, OutputT> doFnRunner;
private final WindowingStrategy<?, ?> windowingStrategy;
- private final Aggregator<Long, Long> droppedDueToLateness;
+ private final Counter droppedDueToLateness = Metrics.counter(
+ StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
private final CleanupTimer cleanupTimer;
private final StateCleaner stateCleaner;
@@ -57,15 +59,13 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
DoFnRunner<InputT, OutputT> doFnRunner,
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer cleanupTimer,
- StateCleaner<W> stateCleaner,
- Aggregator<Long, Long> droppedDueToLateness) {
+ StateCleaner<W> stateCleaner) {
this.doFnRunner = doFnRunner;
this.windowingStrategy = windowingStrategy;
this.cleanupTimer = cleanupTimer;
this.stateCleaner = stateCleaner;
WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
rejectMergingWindowFn(windowFn);
- this.droppedDueToLateness = droppedDueToLateness;
}
private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
@@ -91,7 +91,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
if (isLate(window)) {
// The element is too late for this window.
- droppedDueToLateness.addValue(1L);
+ droppedDueToLateness.inc();
WindowTracing.debug(
"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ "since too far behind inputWatermark:{}",
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 46cbd7d..aeaa63b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -27,10 +27,10 @@ import java.util.Collections;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -72,8 +72,6 @@ public class StatefulDoFnRunnerTest {
@Mock StepContext mockStepContext;
- private InMemoryLongSumAggregator droppedDueToLateness;
- private AggregatorFactory aggregatorFactory;
private InMemoryStateInternals<String> stateInternals;
private InMemoryTimerInternals timerInternals;
@@ -86,16 +84,6 @@ public class StatefulDoFnRunnerTest {
public void setup() {
MockitoAnnotations.initMocks(this);
when(mockStepContext.timerInternals()).thenReturn(timerInternals);
- droppedDueToLateness = new InMemoryLongSumAggregator("droppedDueToLateness");
-
- aggregatorFactory = new AggregatorFactory() {
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
- return (Aggregator<InputT, OutputT>) droppedDueToLateness;
- }
- };
stateInternals = new InMemoryStateInternals<>("hello");
timerInternals = new InMemoryTimerInternals();
@@ -106,6 +94,7 @@ public class StatefulDoFnRunnerTest {
@Test
public void testLateDropping() throws Exception {
+ MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -115,8 +104,6 @@ public class StatefulDoFnRunnerTest {
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
getDoFnRunner(fn),
- mockStepContext,
- aggregatorFactory,
WINDOWING_STRATEGY,
new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -129,7 +116,12 @@ public class StatefulDoFnRunnerTest {
runner.processElement(
WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
- assertEquals(1L, droppedDueToLateness.sum);
+
+
+ long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+ MetricName.named(StatefulDoFnRunner.class,
+ StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
+ assertEquals(1L, droppedValues);
runner.finishBundle();
}
@@ -144,8 +136,6 @@ public class StatefulDoFnRunnerTest {
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
getDoFnRunner(fn),
- mockStepContext,
- aggregatorFactory,
WINDOWING_STRATEGY,
new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
new StatefulDoFnRunner.StateInternalsStateCleaner<>(
@@ -247,28 +237,4 @@ public class StatefulDoFnRunnerTest {
state.write(currentValue + 1);
}
};
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public Combine.CombineFn<Long, ?, Long> getCombineFn() {
- return Sum.ofLongs();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
deleted file mode 100644
index 37524eb..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AggregatorContainer}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorContainerTest {
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
- private final AggregatorContainer container = AggregatorContainer.create();
-
- private static final String STEP_NAME = "step";
- private final Class<?> fn = getClass();
-
- @Mock
- private StepContext stepContext;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(stepContext.getStepName()).thenReturn(STEP_NAME);
- }
-
- @Test
- public void addsAggregatorsOnCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- mutator.commit();
-
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
-
- mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
-
- assertThat("Shouldn't update value until commit",
- (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
- mutator.commit();
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(13));
- }
-
- @Test
- public void failToCreateAfterCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- }
-
- @Test
- public void failToAddValueAfterCommit() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- Aggregator<Integer, ?> aggregator =
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- aggregator.addValue(5);
- }
-
- @Test
- public void failToAddValueAfterCommitWithPrevious() {
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
- mutator.commit();
-
- mutator = container.createMutator();
- Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers());
- mutator.commit();
-
- thrown.expect(IllegalStateException.class);
- aggregator.addValue(5);
- }
-
- @Test
- public void concurrentWrites() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(20);
- int sum = 0;
- for (int i = 0; i < 100; i++) {
- sum += i;
- final int value = i;
- final AggregatorContainer.Mutator mutator = container.createMutator();
- executor.submit(new Runnable() {
- @Override
- public void run() {
- mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
- mutator.commit();
- }
- });
- }
- executor.shutdown();
- assertThat("Expected all threads to complete after 5 seconds",
- executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true));
-
- assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(sum));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 35b6709..0c3a8ed 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,38 +241,6 @@ public class EvaluationContextTest {
}
@Test
- public void handleResultCommitsAggregators() {
- Class<?> fn = getClass();
- DirectExecutionContext fooContext =
- context.getExecutionContext(createdProducer, null);
- DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
- "STEP", createdProducer.getTransform().getName());
- AggregatorContainer container = context.getAggregatorContainer();
- AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
-
- TransformResult<?> result =
- StepTransformResult.withoutHold(createdProducer)
- .withAggregatorChanges(mutator)
- .build();
- context.handleResult(null, ImmutableList.<TimerData>of(), result);
- assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
-
- AggregatorContainer.Mutator mutatorAgain = container.createMutator();
- mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
-
- TransformResult<?> secondResult =
- StepTransformResult.withoutHold(downstreamProducer)
- .withAggregatorChanges(mutatorAgain)
- .build();
- context.handleResult(
- context.createBundle(created).commit(Instant.now()),
- ImmutableList.<TimerData>of(),
- secondResult);
- assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
- }
-
- @Test
public void handleResultStoresState() {
StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
DirectExecutionContext fooContext =
http://git-wip-us.apache.org/repos/asf/beam/blob/fdbff494/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62d7a9c..54eb770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -319,8 +319,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
doFn,
doFnRunner,
- stepContext,
- aggregatorFactory,
windowingStrategy,
cleanupTimer,
stateCleaner);
[4/4] beam git commit: This closes #2744
Posted by dh...@apache.org.
This closes #2744
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fad07f6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fad07f6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fad07f6b
Branch: refs/heads/master
Commit: fad07f6b04f9d9f962607245b997d542330ef422
Parents: e92ead5 650e868
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 00:10:33 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 00:10:33 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 -
.../apache/beam/runners/core/DoFnRunners.java | 11 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 12 --
.../runners/core/GroupAlsoByWindowsDoFn.java | 7 -
.../org/apache/beam/runners/core/OldDoFn.java | 131 ------------------
.../beam/runners/core/SimpleDoFnRunner.java | 4 -
.../beam/runners/core/SimpleOldDoFnRunner.java | 19 ---
.../beam/runners/core/StatefulDoFnRunner.java | 12 +-
.../core/GroupAlsoByWindowsProperties.java | 7 -
.../apache/beam/runners/core/NoOpOldDoFn.java | 7 -
.../apache/beam/runners/core/OldDoFnTest.java | 109 ---------------
.../runners/core/StatefulDoFnRunnerTest.java | 54 ++------
.../runners/direct/AggregatorContainerTest.java | 137 -------------------
.../runners/direct/EvaluationContextTest.java | 33 -----
.../wrappers/streaming/DoFnOperator.java | 2 -
.../apache/beam/sdk/transforms/DoFnTester.java | 14 --
16 files changed, 17 insertions(+), 544 deletions(-)
----------------------------------------------------------------------