You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:34 UTC

[30/50] incubator-beam git commit: Delegate getAggregators() in various DoFn adapters

Delegate getAggregators() in various DoFn adapters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d46c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d46c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d46c2d

Branch: refs/heads/python-sdk
Commit: b0d46c2deb4318f8d0e55eeeb20e1d11ceadd218
Parents: 6fa8057
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 15:50:17 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/transforms/DoFn.java   |  7 ++++++-
 .../org/apache/beam/sdk/transforms/DoFnAdapters.java     |  6 ++++++
 .../java/org/apache/beam/sdk/transforms/OldDoFn.java     |  7 ++++++-
 .../java/org/apache/beam/sdk/transforms/OldDoFnTest.java | 11 +++++++----
 4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9978ef4..221d942 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -28,6 +28,8 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -236,7 +238,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
       aggregator.setDelegate(delegate);
     }
-
   }
 
   /**
@@ -298,6 +299,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
 
+  Collection<Aggregator<?, ?>> getAggregators() {
+    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+  }
+
   /**
    * Protects aggregators from being created after initialization.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index a3466bb..1a74ae7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -244,6 +245,11 @@ public class DoFnAdapters {
     }
 
     @Override
+    Collection<Aggregator<?, ?>> getAggregators() {
+      return fn.getAggregators();
+    }
+
+    @Override
     public Duration getAllowedTimestampSkew() {
       return fn.getAllowedTimestampSkew();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index f16e0b3..9bf9003 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -676,6 +676,11 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     }
 
     @Override
+    Collection<Aggregator<?, ?>> getAggregators() {
+      return OldDoFn.this.getAggregators();
+    }
+
+    @Override
     protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
       return OldDoFn.this.getOutputTypeDescriptor();
     }
@@ -683,7 +688,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   /**
    * A {@link ProcessContext} for an {@link OldDoFn} that implements
-   * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+   * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
    */
   private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index e7ae135..07e3078 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.AggregatorValues;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -216,14 +216,17 @@ public class OldDoFnTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     CountOddsFn countOdds = new CountOddsFn();
-    pipeline
+    PCollection<Void> output = pipeline
         .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
         .apply(ParDo.of(countOdds));
     PipelineResult result = pipeline.run();
 
     AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
-    assertThat(values.getValuesAtSteps(),
-        equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+
+    Map<String, Integer> valuesMap = values.getValuesAtSteps();
+
+    assertThat(valuesMap.size(), equalTo(1));
+    assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4));
   }
 
   private static class CountOddsFn extends OldDoFn<Integer, Void> {