You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:34 UTC
[30/50] incubator-beam git commit: Delegate getAggregators() in
various DoFn adapters
Delegate getAggregators() in various DoFn adapters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d46c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d46c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d46c2d
Branch: refs/heads/python-sdk
Commit: b0d46c2deb4318f8d0e55eeeb20e1d11ceadd218
Parents: 6fa8057
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 15:50:17 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++++-
.../org/apache/beam/sdk/transforms/DoFnAdapters.java | 6 ++++++
.../java/org/apache/beam/sdk/transforms/OldDoFn.java | 7 ++++++-
.../java/org/apache/beam/sdk/transforms/OldDoFnTest.java | 11 +++++++----
4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9978ef4..221d942 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -28,6 +28,8 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -236,7 +238,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
aggregator.setDelegate(delegate);
}
-
}
/**
@@ -298,6 +299,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+ Collection<Aggregator<?, ?>> getAggregators() {
+ return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+ }
+
/**
* Protects aggregators from being created after initialization.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index a3466bb..1a74ae7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import java.io.IOException;
+import java.util.Collection;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -244,6 +245,11 @@ public class DoFnAdapters {
}
@Override
+ Collection<Aggregator<?, ?>> getAggregators() {
+ return fn.getAggregators();
+ }
+
+ @Override
public Duration getAllowedTimestampSkew() {
return fn.getAllowedTimestampSkew();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index f16e0b3..9bf9003 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -676,6 +676,11 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
}
@Override
+ Collection<Aggregator<?, ?>> getAggregators() {
+ return OldDoFn.this.getAggregators();
+ }
+
+ @Override
protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
return OldDoFn.this.getOutputTypeDescriptor();
}
@@ -683,7 +688,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
/**
* A {@link ProcessContext} for an {@link OldDoFn} that implements
- * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+ * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
*/
private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index e7ae135..07e3078 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.AggregatorValues;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -216,14 +216,17 @@ public class OldDoFnTest implements Serializable {
Pipeline pipeline = TestPipeline.create();
CountOddsFn countOdds = new CountOddsFn();
- pipeline
+ PCollection<Void> output = pipeline
.apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
.apply(ParDo.of(countOdds));
PipelineResult result = pipeline.run();
AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
- assertThat(values.getValuesAtSteps(),
- equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+
+ Map<String, Integer> valuesMap = values.getValuesAtSteps();
+
+ assertThat(valuesMap.size(), equalTo(1));
+ assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4));
}
private static class CountOddsFn extends OldDoFn<Integer, Void> {