You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/10 18:34:40 UTC

[1/4] incubator-beam git commit: Inline MapAggregatorValues to remove dependencies

Repository: incubator-beam
Updated Branches:
  refs/heads/master fc87a0ca7 -> fa0e395f0


Inline MapAggregatorValues to remove dependencies

This class is trivial. Adding it to the public API of the SDK is
not desirable, since it is just for runners. Adding it to runners-core
would be OK but is really overkill for a glorified Map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9da4bbcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9da4bbcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9da4bbcd

Branch: refs/heads/master
Commit: 9da4bbcdaf3c19ee5f78836b7cffaab947861a58
Parents: c867790
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 21 20:24:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 13:55:24 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 18 +++++--
 .../runners/dataflow/DataflowPipelineJob.java   | 17 ++++++-
 .../beam/sdk/util/MapAggregatorValues.java      | 50 --------------------
 3 files changed, 30 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 743c565..a9c8ecb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.MapAggregatorValues;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -47,6 +46,7 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -321,7 +321,7 @@ public class DirectRunner
         throws AggregatorRetrievalException {
       AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
       Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      Map<String, T> stepValues = new HashMap<>();
+      final Map<String, T> stepValues = new HashMap<>();
       for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
         if (steps.contains(transform.getTransform())) {
           T aggregate = aggregators.getAggregate(
@@ -331,7 +331,19 @@ public class DirectRunner
           }
         }
       }
-      return new MapAggregatorValues<>(stepValues);
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return stepValues;
+        }
+
+        @Override
+        public String toString() {
+          return MoreObjects.toStringHelper(this)
+              .add("stepValues", stepValues)
+              .toString();
+        }
+      };
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 3194f7c..a6baa4f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.MapAggregatorValues;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.util.BackOff;
@@ -41,6 +40,7 @@ import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -369,7 +369,20 @@ public class DataflowPipelineJob implements PipelineResult {
   public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
       throws AggregatorRetrievalException {
     try {
-      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
+      final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
+      return new AggregatorValues<OutputT>() {
+        @Override
+        public Map<String, OutputT> getValuesAtSteps() {
+          return stepValues;
+        }
+
+        @Override
+        public String toString() {
+          return MoreObjects.toStringHelper(this)
+              .add("stepValues", stepValues)
+              .toString();
+        }
+      };
     } catch (IOException e) {
       throw new AggregatorRetrievalException(
           "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
deleted file mode 100644
index 3d949ec..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.transforms.Aggregator;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-
-/**
- * An {@link AggregatorValues} implementation that is backed by an in-memory map.
- *
- * @param <T> the output type of the {@link Aggregator}
- */
-public class MapAggregatorValues<T> extends AggregatorValues<T> {
-  private final Map<String, T> stepValues;
-
-  public MapAggregatorValues(Map<String, T> stepValues) {
-    this.stepValues = stepValues;
-  }
-
-  @Override
-  public Map<String, T> getValuesAtSteps() {
-    return stepValues;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("stepValues", stepValues)
-        .toString();
-  }
-}


[3/4] incubator-beam git commit: Move aggregator support classes out of runners namespace, make private

Posted by ke...@apache.org.
Move aggregator support classes out of runners namespace, make private


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/adec254d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/adec254d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/adec254d

Branch: refs/heads/master
Commit: adec254d5fdb409e786a1fc2bcee38f8a7a04408
Parents: 9da4bbc
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 1 14:56:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 10 11:34:03 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |   7 +-
 .../beam/runners/flink/FlinkRunnerResult.java   |   4 +-
 .../runners/dataflow/DataflowPipelineJob.java   |   4 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../dataflow/DataflowPipelineJobTest.java       |   4 +-
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../spark/translation/SparkRuntimeContext.java  |   2 +-
 .../translation/MultiOutputWordCountTest.java   |   2 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |  93 ++++++++
 .../beam/sdk/AggregatorRetrievalException.java  |  33 +++
 .../org/apache/beam/sdk/AggregatorValues.java   |  52 +++++
 .../main/java/org/apache/beam/sdk/Pipeline.java |  10 +
 .../org/apache/beam/sdk/PipelineResult.java     |   2 -
 .../runners/AggregatorPipelineExtractor.java    |  93 --------
 .../runners/AggregatorRetrievalException.java   |  33 ---
 .../beam/sdk/runners/AggregatorValues.java      |  52 -----
 .../sdk/AggregatorPipelineExtractorTest.java    | 229 +++++++++++++++++++
 .../AggregatorPipelineExtractorTest.java        | 229 -------------------
 .../apache/beam/sdk/transforms/DoFnTest.java    |   1 +
 .../apache/beam/sdk/transforms/OldDoFnTest.java |   3 +-
 20 files changed, 434 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index a9c8ecb..f2b781e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -20,15 +20,14 @@ package org.apache.beam.runners.direct;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -244,7 +243,7 @@ public class DirectRunner
     executor.start(consumerTrackingVisitor.getRootTransforms());
 
     Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
+        pipeline.getAggregatorSteps();
     DirectPipelineResult result =
         new DirectPipelineResult(executor, context, aggregatorSteps);
     if (options.isBlockOnRun()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index cae0b2a..923d54c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index a6baa4f..e043e23 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -23,9 +23,9 @@ import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fadd9c7..3b68e92 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -71,7 +71,6 @@ import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -596,9 +595,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Obtain all of the extractors from the PTransforms used in the pipeline so the
     // DataflowPipelineJob has access to them.
-    AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline);
     Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        aggregatorExtractor.getAggregatorSteps();
+        pipeline.getAggregatorSteps();
 
     DataflowAggregatorTransforms aggregatorTransforms =
         new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 343d538..e6277d9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -35,10 +35,10 @@ import static org.mockito.Mockito.when;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 169c2af..4ccac0e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -22,10 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 46f5b33..c2edd02 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -20,12 +20,12 @@ package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.aggregators.AggAccumParam;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Max;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 291f7b2..0d0c0b4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.ApproximateUnique;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
new file mode 100644
index 0000000..ac215c9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AggregatorRetriever;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
+ * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
+ */
+class AggregatorPipelineExtractor {
+  private final Pipeline pipeline;
+
+  /**
+   * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
+   */
+  public AggregatorPipelineExtractor(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  /**
+   * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
+   * PTransform PTransforms} in which it is used.
+   */
+  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
+    HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
+    pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
+    return aggregatorSteps.asMap();
+  }
+
+  private static class AggregatorVisitor extends PipelineVisitor.Defaults {
+    private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
+
+    public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
+      this.aggregatorSteps = aggregatorSteps;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(TransformTreeNode node) {
+      PTransform<?, ?> transform = node.getTransform();
+      addStepToAggregators(transform, getAggregators(transform));
+    }
+
+    private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
+      if (transform != null) {
+        if (transform instanceof ParDo.Bound) {
+          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
+        } else if (transform instanceof ParDo.BoundMulti) {
+          return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
+        }
+      }
+      return Collections.emptyList();
+    }
+
+    private void addStepToAggregators(
+        PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
+      for (Aggregator<?, ?> aggregator : aggregators) {
+        aggregatorSteps.put(aggregator, transform);
+      }
+    }
+
+    @Override
+    public void visitValue(PValue value, TransformTreeNode producer) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
new file mode 100644
index 0000000..3040815
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * Signals that an exception has occurred while retrieving {@link Aggregator}s.
+ */
+public class AggregatorRetrievalException extends Exception {
+  /**
+   * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
+   * cause.
+   */
+  public AggregatorRetrievalException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
new file mode 100644
index 0000000..efaad85
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
+ * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis.
+ *
+ * @param <T> the output type of the aggregator
+ */
+public abstract class AggregatorValues<T> {
+  /**
+   * Get the values of the {@link Aggregator} at all steps it was used.
+   */
+  public Collection<T> getValues() {
+    return getValuesAtSteps().values();
+  }
+
+  /**
+   * Get the values of the {@link Aggregator} by the user name at each step it was used.
+   */
+  public abstract Map<String, T> getValuesAtSteps();
+
+  /**
+   * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
+   */
+  public T getTotalValue(CombineFn<T, ?, T> combineFn) {
+    return combineFn.apply(getValues());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index e4f3e4a..1bbc56f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -47,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -518,6 +520,14 @@ public class Pipeline {
   }
 
   /**
+   * Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link
+   * PTransform PTransforms} in which it is used.
+   */
+  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
+    return new AggregatorPipelineExtractor(this).getAggregatorSteps();
+  }
+
+  /**
    * Builds a name from a "/"-delimited prefix and a name.
    */
   private String buildName(String namePrefix, String name) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index 993962c..edfc924 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk;
 
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
deleted file mode 100644
index 146ddfa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-public class AggregatorPipelineExtractor {
-  private final Pipeline pipeline;
-
-  /**
-   * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
-   */
-  public AggregatorPipelineExtractor(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  /**
-   * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
-   * PTransform PTransforms} in which it is used.
-   */
-  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
-    HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
-    pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
-    return aggregatorSteps.asMap();
-  }
-
-  private static class AggregatorVisitor extends PipelineVisitor.Defaults {
-    private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
-    public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
-      this.aggregatorSteps = aggregatorSteps;
-    }
-
-    @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      addStepToAggregators(transform, getAggregators(transform));
-    }
-
-    private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
-      if (transform != null) {
-        if (transform instanceof ParDo.Bound) {
-          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
-        } else if (transform instanceof ParDo.BoundMulti) {
-          return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
-        }
-      }
-      return Collections.emptyList();
-    }
-
-    private void addStepToAggregators(
-        PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
-      for (Aggregator<?, ?> aggregator : aggregators) {
-        aggregatorSteps.put(aggregator, transform);
-      }
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java
deleted file mode 100644
index a0973c3..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
-  /**
-   * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
-   * cause.
-   */
-  public AggregatorRetrievalException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
deleted file mode 100644
index 6f6836e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
-  /**
-   * Get the values of the {@link Aggregator} at all steps it was used.
-   */
-  public Collection<T> getValues() {
-    return getValuesAtSteps().values();
-  }
-
-  /**
-   * Get the values of the {@link Aggregator} by the user name at each step it was used.
-   */
-  public abstract Map<String, T> getValuesAtSteps();
-
-  /**
-   * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
-   */
-  public T getTotalValue(CombineFn<T, ?, T> combineFn) {
-    return combineFn.apply(getValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
new file mode 100644
index 0000000..930fbe7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests for {@link AggregatorPipelineExtractor}.
+ */
+@RunWith(JUnit4.class)
+public class AggregatorPipelineExtractorTest {
+  @Mock
+  private Pipeline p;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetAggregatorStepsWithParDoBoundExtractsSteps() {
+    @SuppressWarnings("rawtypes")
+    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
+    AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
+    when(bound.getFn()).thenReturn(fn);
+
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
+
+    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    when(transformNode.getTransform()).thenReturn(bound);
+
+    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
+        .when(p)
+        .traverseTopologically(Mockito.any(PipelineVisitor.class));
+
+    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        extractor.getAggregatorSteps();
+
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo));
+    assertEquals(aggregatorSteps.size(), 2);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() {
+    @SuppressWarnings("rawtypes")
+    ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
+    AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
+    when(bound.getFn()).thenReturn(fn);
+
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+
+    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    when(transformNode.getTransform()).thenReturn(bound);
+
+    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
+        .when(p)
+        .traverseTopologically(Mockito.any(PipelineVisitor.class));
+
+    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        extractor.getAggregatorSteps();
+
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo));
+    assertEquals(2, aggregatorSteps.size());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() {
+    @SuppressWarnings("rawtypes")
+    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
+    @SuppressWarnings("rawtypes")
+    ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
+    AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
+    when(bound.getFn()).thenReturn(fn);
+    when(otherBound.getFn()).thenReturn(fn);
+
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+
+    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    when(transformNode.getTransform()).thenReturn(bound);
+    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
+    when(otherTransformNode.getTransform()).thenReturn(otherBound);
+
+    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
+        .when(p)
+        .traverseTopologically(Mockito.any(PipelineVisitor.class));
+
+    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        extractor.getAggregatorSteps();
+
+    assertEquals(
+        ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorOne));
+    assertEquals(
+        ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorTwo));
+    assertEquals(2, aggregatorSteps.size());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetAggregatorStepsWithDifferentStepsAddsSteps() {
+    @SuppressWarnings("rawtypes")
+    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
+
+    AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+
+    when(bound.getFn()).thenReturn(fn);
+
+    @SuppressWarnings("rawtypes")
+    ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
+
+    AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
+    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
+
+    when(otherBound.getFn()).thenReturn(otherFn);
+
+    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    when(transformNode.getTransform()).thenReturn(bound);
+    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
+    when(otherTransformNode.getTransform()).thenReturn(otherBound);
+
+    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
+        .when(p)
+        .traverseTopologically(Mockito.any(PipelineVisitor.class));
+
+    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        extractor.getAggregatorSteps();
+
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
+    assertEquals(ImmutableSet.<PTransform<?, ?>>of(otherBound), aggregatorSteps.get(aggregatorTwo));
+    assertEquals(2, aggregatorSteps.size());
+  }
+
+  private static class VisitNodesAnswer implements Answer<Object> {
+    private final List<TransformTreeNode> nodes;
+
+    public VisitNodesAnswer(List<TransformTreeNode> nodes) {
+      this.nodes = nodes;
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      PipelineVisitor visitor = (PipelineVisitor) invocation.getArguments()[0];
+      for (TransformTreeNode node : nodes) {
+        visitor.visitPrimitiveTransform(node);
+      }
+      return null;
+    }
+  }
+
+  private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> {
+    public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
+        CombineFn<InputT, ?, OutT> combiner) {
+      return createAggregator(randomName(), combiner);
+    }
+
+    private String randomName() {
+      return UUID.randomUUID().toString();
+    }
+
+    @Override
+    public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception {
+      fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
deleted file mode 100644
index 13476e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Tests for {@link AggregatorPipelineExtractor}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorPipelineExtractorTest {
-  @Mock
-  private Pipeline p;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithParDoBoundExtractsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
-    AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
-
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
-    when(transformNode.getTransform()).thenReturn(bound);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(aggregatorSteps.size(), 2);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
-    AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
-
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
-    when(transformNode.getTransform()).thenReturn(bound);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
-    @SuppressWarnings("rawtypes")
-    ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
-    AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
-    when(otherBound.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
-
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
-    when(transformNode.getTransform()).thenReturn(bound);
-    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
-    when(otherTransformNode.getTransform()).thenReturn(otherBound);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(
-        ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorOne));
-    assertEquals(
-        ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithDifferentStepsAddsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
-
-    AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-
-    when(bound.getFn()).thenReturn(fn);
-
-    @SuppressWarnings("rawtypes")
-    ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
-
-    AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
-    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
-
-    when(otherBound.getFn()).thenReturn(otherFn);
-
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
-    when(transformNode.getTransform()).thenReturn(bound);
-    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
-    when(otherTransformNode.getTransform()).thenReturn(otherBound);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(otherBound), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  private static class VisitNodesAnswer implements Answer<Object> {
-    private final List<TransformTreeNode> nodes;
-
-    public VisitNodesAnswer(List<TransformTreeNode> nodes) {
-      this.nodes = nodes;
-    }
-
-    @Override
-    public Object answer(InvocationOnMock invocation) throws Throwable {
-      PipelineVisitor visitor = (PipelineVisitor) invocation.getArguments()[0];
-      for (TransformTreeNode node : nodes) {
-        visitor.visitPrimitiveTransform(node);
-      }
-      return null;
-    }
-  }
-
-  private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> {
-    public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
-        CombineFn<InputT, ?, OutT> combiner) {
-      return createAggregator(randomName(), combiner);
-    }
-
-    private String randomName() {
-      return UUID.randomUUID().toString();
-    }
-
-    @Override
-    public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception {
-      fail();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index 710e4ce..3fb3193 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -128,6 +128,7 @@ public class DoFnTest implements Serializable {
     DoFn<Void, Void> doFn = new NoOpDoFn();
 
     Aggregator<Double, Double> aggregatorOne =
+
         doFn.createAggregator(nameOne, combiner);
     Aggregator<Double, Double> aggregatorTwo =
         doFn.createAggregator(nameTwo, combiner);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adec254d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index 9d144b3..5946d9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -24,10 +24,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import com.google.common.collect.ImmutableMap;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;


[4/4] incubator-beam git commit: This closes #699

Posted by ke...@apache.org.
This closes #699


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa0e395f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa0e395f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa0e395f

Branch: refs/heads/master
Commit: fa0e395f0ad3e874206066881a273869e75d96a3
Parents: fc87a0c adec254
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 10 11:34:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 10 11:34:17 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  25 +-
 .../beam/runners/flink/FlinkRunnerResult.java   |   4 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  21 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../dataflow/DataflowPipelineJobTest.java       |   4 +-
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../spark/translation/SparkRuntimeContext.java  |   2 +-
 .../translation/MultiOutputWordCountTest.java   |   2 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |  93 ++++++++
 .../beam/sdk/AggregatorRetrievalException.java  |  33 +++
 .../org/apache/beam/sdk/AggregatorValues.java   |  52 +++++
 .../main/java/org/apache/beam/sdk/Pipeline.java |  10 +
 .../org/apache/beam/sdk/PipelineResult.java     |   2 -
 .../runners/AggregatorPipelineExtractor.java    |  93 --------
 .../runners/AggregatorRetrievalException.java   |  33 ---
 .../beam/sdk/runners/AggregatorValues.java      |  52 -----
 .../beam/sdk/util/MapAggregatorValues.java      |  50 ----
 .../sdk/AggregatorPipelineExtractorTest.java    | 229 +++++++++++++++++++
 .../AggregatorPipelineExtractorTest.java        | 229 -------------------
 .../apache/beam/sdk/transforms/DoFnTest.java    |   1 +
 .../apache/beam/sdk/transforms/OldDoFnTest.java |   3 +-
 21 files changed, 464 insertions(+), 482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa0e395f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------


[2/4] incubator-beam git commit: Fix MapAggregatorValues use of toStringHelper

Posted by ke...@apache.org.
Fix MapAggregatorValues use of toStringHelper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c867790a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c867790a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c867790a

Branch: refs/heads/master
Commit: c867790abad182bcbffa9a7a88de76a608b6d039
Parents: d60a0a0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 21 20:18:05 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 13:55:24 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/util/MapAggregatorValues.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c867790a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
index fe02666..3d949ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
@@ -43,7 +43,7 @@ public class MapAggregatorValues<T> extends AggregatorValues<T> {
 
   @Override
   public String toString() {
-    return MoreObjects.toStringHelper(MapAggregatorValues.class)
+    return MoreObjects.toStringHelper(this)
         .add("stepValues", stepValues)
         .toString();
   }