You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/20 15:51:28 UTC
[1/2] beam git commit: [BEAM-1948] Defend against absent Aggregators
Repository: beam
Updated Branches:
refs/heads/master 104f98235 -> 4e0c8333c
[BEAM-1948] Defend against absent Aggregators
Add protection against null pointer exception if aggregator key is not
present in aggregatorSteps
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8434acf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8434acf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8434acf
Branch: refs/heads/master
Commit: b8434acf6081c1e411e9818dc64d6329db6c729e
Parents: 104f982
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue Apr 18 15:38:05 2017 +0200
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 20 08:50:33 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 14 ++++----
.../beam/runners/direct/DirectRunnerTest.java | 37 ++++++++++++++++++++
2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b8434acf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 43147a0..45a04a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -383,12 +383,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
final Map<String, T> stepValues = new HashMap<>();
- for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
- if (steps.contains(transform.getTransform())) {
- T aggregate = aggregators.getAggregate(
- evaluationContext.getStepName(transform), aggregator.getName());
- if (aggregate != null) {
- stepValues.put(transform.getFullName(), aggregate);
+ if (steps != null) {
+ for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
+ if (steps.contains(transform.getTransform())) {
+ T aggregate = aggregators
+ .getAggregate(evaluationContext.getStepName(transform), aggregator.getName());
+ if (aggregate != null) {
+ stepValues.put(transform.getFullName(), aggregate);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b8434acf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index ed19be2..246c111 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -66,9 +69,13 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -525,6 +532,36 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
+
+ @Test
+ public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
+ Pipeline p = getPipeline();
+ IdentityDoFn identityDoFn = new IdentityDoFn();
+ p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3")))
+ .apply(ParDo.of(identityDoFn));
+ PipelineResult pipelineResult = p.run();
+ pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
+ }
+
+ private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
+ private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs());
+ private static final String STATE_ID = "state";
+ @StateId(STATE_ID)
+ private static final StateSpec<Object, ValueState<String>> stateSpec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String> state){
+ state.write("state content");
+ counter.addValue(1L);
+ context.output(context.element().getValue());
+ }
+
+ public Aggregator<Long, Long> getCounter() {
+ return counter;
+ }
+ }
+
private static class LongNoDecodeCoder extends AtomicCoder<Long> {
@Override
public void encode(
[2/2] beam git commit: This closes #2573
Posted by tg...@apache.org.
This closes #2573
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e0c8333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e0c8333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e0c8333
Branch: refs/heads/master
Commit: 4e0c8333c210f644382f872674a7a4ff03d672e1
Parents: 104f982 b8434ac
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 20 08:51:16 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 20 08:51:16 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 14 ++++----
.../beam/runners/direct/DirectRunnerTest.java | 37 ++++++++++++++++++++
2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------