You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:15 UTC

[2/4] beam git commit: Removing Aggregators from runner-specific examples and tests

Removing Aggregators from runner-specific examples and tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fe11d12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fe11d12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fe11d12

Branch: refs/heads/master
Commit: 1fe11d12404b53d978e70ee2c7c37582dcad10c9
Parents: 904b413
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:03:27 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   9 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 --------
 .../beam/runners/core/OldDoFnContextTest.java   |  72 ----
 .../apache/beam/runners/core/OldDoFnTest.java   |  32 --
 .../beam/runners/direct/DirectRunner.java       |   3 -
 .../dataflow/DataflowPipelineJobTest.java       | 337 -------------------
 .../beam/runners/spark/examples/WordCount.java  |  13 +-
 8 files changed, 11 insertions(+), 601 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 4c82f46..e6e3a92 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,7 +95,7 @@ public class DebuggingWordCount {
      * in a dashboard, etc.
      */
     private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
-    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
+    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index a1713ac..b980715 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -30,15 +30,15 @@ import org.apache.beam.runners.apex.TestApexRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -67,13 +67,12 @@ public class WordCountTest {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private static final long serialVersionUID = 1L;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+    private final Counter emptyLines = Metrics.counter("main", "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc(1);
       }
 
       // Split the line into words.

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index b44e8a4..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
-  @Mock
-  private Aggregator<Long, Long> delegate;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testAddValueWithoutDelegateThrowsException() {
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("cannot be called");
-    thrown.expectMessage("DoFn");
-
-    aggregator.addValue(21.2);
-  }
-
-  @Test
-  public void testSetDelegateThenAddValueCallsDelegate() {
-    String name = "agg";
-    CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Long, Long> aggregator =
-        (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
-    aggregator.setDelegate(delegate);
-
-    aggregator.addValue(12L);
-
-    verify(delegate).addValue(12L);
-  }
-
-  @Test
-  public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    @SuppressWarnings("unchecked")
-    Aggregator<Double, Double> secondDelegate =
-        mock(Aggregator.class, "secondDelegate");
-
-    aggregator.setDelegate(aggregator);
-    aggregator.setDelegate(secondDelegate);
-
-    aggregator.addValue(2.25);
-
-    verify(secondDelegate).addValue(2.25);
-    verify(delegate, never()).addValue(anyLong());
-  }
-
-  @Test
-  public void testGetNameReturnsName() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-  }
-
-  @Test
-  public void testGetCombineFnReturnsCombineFn() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> CombineFn<T, ?, T> mockCombineFn(
-      @SuppressWarnings("unused") Class<T> clazz) {
-    return mock(CombineFn.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
deleted file mode 100644
index a1cd49d..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
-  @Mock
-  private Aggregator<Long, Long> agg;
-
-  private OldDoFn<Object, Object> fn;
-  private OldDoFn<Object, Object>.Context context;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-
-    // Need to be real objects to call the constructor, and to reference the
-    // outer instance of OldDoFn
-    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
-    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
-    fn = spy(noOpFn);
-    context = spy(noOpContext);
-  }
-
-  @Test
-  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-    Aggregator<Long, Long> delegateAggregator =
-        fn.createAggregator("test", combiner);
-
-    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-    context.setupDelegateAggregators();
-    delegateAggregator.addValue(1L);
-
-    verify(agg).addValue(1L);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index 425de07..d6838e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -19,14 +19,11 @@ package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -50,19 +47,6 @@ public class OldDoFnTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testCreateAggregatorWithCombinerSucceeds() {
-    String name = "testAggregator";
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @Test
   public void testCreateAggregatorWithNullNameThrowsException() {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("name cannot be null");
@@ -114,22 +98,6 @@ public class OldDoFnTest implements Serializable {
   }
 
   @Test
-  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
-    String nameOne = "testAggregator";
-    String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Double, Double> aggregatorOne =
-        doFn.createAggregator(nameOne, combiner);
-    Aggregator<Double, Double> aggregatorTwo =
-        doFn.createAggregator(nameTwo, combiner);
-
-    assertNotEquals(aggregatorOne, aggregatorTwo);
-  }
-
-  @Test
   public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
     OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 45a04a7..77ec68f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 9cab5e8..59315a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -19,13 +19,8 @@ package org.apache.beam.runners.dataflow;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
@@ -38,36 +33,22 @@ import static org.mockito.Mockito.when;
 import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics;
 import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
 import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSetMultimap;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.net.SocketTimeoutException;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.values.PInput;
@@ -367,323 +348,6 @@ public class DataflowPipelineJobTest {
         DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
   }
 
-  @Test
-  public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
-      throws IOException, AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
-      throws IOException, AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    jobMetrics.setMetrics(null);
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate update = new MetricUpdate();
-    long stepValue = 1234L;
-    update.setScalar(new BigDecimal(stepValue));
-
-    MetricStructuredName structuredName = new MetricStructuredName();
-    structuredName.setName(aggregatorName);
-    structuredName.setContext(ImmutableMap.of("step", stepName));
-    update.setName(structuredName);
-
-    jobMetrics.setMetrics(ImmutableList.of(update));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
-    assertThat(values.getValuesAtSteps().size(), equalTo(1));
-    assertThat(values.getValues(), contains(stepValue));
-    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue)));
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-
-    Pipeline p = Pipeline.create(options);
-
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p);
-
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
-    String otherStepName = "s88";
-    String otherFullName = "Spam/Ham/Eggs";
-    AppliedPTransform<?, ?, ?> otherAppliedTransform =
-        appliedPTransform(otherFullName, otherTransform, p);
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
-            aggregator, pTransform, aggregator, otherTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
-            appliedTransform, stepName, otherAppliedTransform, otherStepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate updateOne = new MetricUpdate();
-    long stepValue = 1234L;
-    updateOne.setScalar(new BigDecimal(stepValue));
-
-    MetricStructuredName structuredNameOne = new MetricStructuredName();
-    structuredNameOne.setName(aggregatorName);
-    structuredNameOne.setContext(ImmutableMap.of("step", stepName));
-    updateOne.setName(structuredNameOne);
-
-    MetricUpdate updateTwo = new MetricUpdate();
-    long stepValueTwo = 1024L;
-    updateTwo.setScalar(new BigDecimal(stepValueTwo));
-
-    MetricStructuredName structuredNameTwo = new MetricStructuredName();
-    structuredNameTwo.setName(aggregatorName);
-    structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
-    updateTwo.setName(structuredNameTwo);
-
-    jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
-    assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo));
-    assertThat(values.getValuesAtSteps().size(), equalTo(2));
-    assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo));
-    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo)));
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate ignoredUpdate = new MetricUpdate();
-    ignoredUpdate.setScalar(null);
-
-    MetricStructuredName ignoredName = new MetricStructuredName();
-    ignoredName.setName("ignoredAggregator.elementCount.out0");
-    ignoredName.setContext(null);
-    ignoredUpdate.setName(ignoredName);
-
-    jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps().entrySet(), empty());
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
-      throws AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("not used in this pipeline");
-    job.getAggregatorValues(aggregator);
-  }
-
-  @Test
-  public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    IOException cause = new IOException();
-    when(getMetrics.execute()).thenThrow(cause);
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    thrown.expect(AggregatorRetrievalException.class);
-    thrown.expectCause(is(cause));
-    thrown.expectMessage(aggregator.toString());
-    thrown.expectMessage("when retrieving Aggregator values for");
-    job.getAggregatorValues(aggregator);
-  }
-
-  private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> {
-    private final CombineFn<InT, ?, OutT> combineFn;
-    private final String name;
-
-    public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
-      this.combineFn = combineFn;
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(InT value) {
-      throw new AssertionError();
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<InT, ?, OutT> getCombineFn() {
-      return combineFn;
-    }
-  }
-
   private AppliedPTransform<?, ?, ?> appliedPTransform(
       String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
     PInput input = mock(PInput.class);
@@ -696,7 +360,6 @@ public class DataflowPipelineJobTest {
         p);
   }
 
-
   private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
 
     private long fastNanoTime;

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index da14ee2..32caa9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -19,18 +19,18 @@ package org.apache.beam.runners.spark.examples;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -44,14 +44,13 @@ public class WordCount {
    * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
    * pipeline.
    */
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc();
       }
 
       // Split the line into words.