You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:14 UTC

[1/4] beam git commit: Closes #2184

Repository: beam
Updated Branches:
  refs/heads/master 1d97bdffc -> 4fabaef80


Closes #2184


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fabaef8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fabaef8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fabaef8

Branch: refs/heads/master
Commit: 4fabaef806bc8842016c546fa955be8b28c921aa
Parents: 1d97bdf 6f26db8
Author: bchambers <bc...@google.com>
Authored: Tue Apr 25 09:20:26 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |  20 +-
 .../org/apache/beam/examples/WordCount.java     |   9 +-
 .../cookbook/CombinePerKeyExamples.java         |   9 +-
 .../beam/examples/complete/game/GameStats.java  |   8 +-
 .../beam/examples/complete/game/UserScore.java  |   8 +-
 .../beam/runners/apex/ApexRunnerResult.java     |   9 -
 .../runners/apex/examples/WordCountTest.java    |   9 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 --------
 .../beam/runners/core/OldDoFnContextTest.java   |  72 ----
 .../apache/beam/runners/core/OldDoFnTest.java   |  32 --
 .../beam/runners/direct/DirectRunner.java       |  37 --
 .../beam/runners/direct/DirectRunnerTest.java   |  36 --
 .../flink/FlinkDetachedRunnerResult.java        |  11 -
 .../beam/runners/flink/FlinkRunnerResult.java   |  21 --
 .../runners/dataflow/DataflowPipelineJob.java   |  27 --
 .../dataflow/DataflowPipelineJobTest.java       | 337 -------------------
 .../beam/runners/spark/SparkPipelineResult.java |   7 -
 .../beam/runners/spark/examples/WordCount.java  |  11 +-
 .../org/apache/beam/sdk/PipelineResult.java     |  12 -
 19 files changed, 34 insertions(+), 785 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: Removing Aggregators from runner-specific examples and tests

Posted by bc...@apache.org.
Removing Aggregators from runner-specific examples and tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fe11d12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fe11d12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fe11d12

Branch: refs/heads/master
Commit: 1fe11d12404b53d978e70ee2c7c37582dcad10c9
Parents: 904b413
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:03:27 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   9 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 --------
 .../beam/runners/core/OldDoFnContextTest.java   |  72 ----
 .../apache/beam/runners/core/OldDoFnTest.java   |  32 --
 .../beam/runners/direct/DirectRunner.java       |   3 -
 .../dataflow/DataflowPipelineJobTest.java       | 337 -------------------
 .../beam/runners/spark/examples/WordCount.java  |  13 +-
 8 files changed, 11 insertions(+), 601 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 4c82f46..e6e3a92 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,7 +95,7 @@ public class DebuggingWordCount {
      * in a dashboard, etc.
      */
     private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
-    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
+    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index a1713ac..b980715 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -30,15 +30,15 @@ import org.apache.beam.runners.apex.TestApexRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -67,13 +67,12 @@ public class WordCountTest {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private static final long serialVersionUID = 1L;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+    private final Counter emptyLines = Metrics.counter("main", "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc(1);
       }
 
       // Split the line into words.

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index b44e8a4..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
-  @Mock
-  private Aggregator<Long, Long> delegate;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testAddValueWithoutDelegateThrowsException() {
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("cannot be called");
-    thrown.expectMessage("DoFn");
-
-    aggregator.addValue(21.2);
-  }
-
-  @Test
-  public void testSetDelegateThenAddValueCallsDelegate() {
-    String name = "agg";
-    CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Long, Long> aggregator =
-        (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
-    aggregator.setDelegate(delegate);
-
-    aggregator.addValue(12L);
-
-    verify(delegate).addValue(12L);
-  }
-
-  @Test
-  public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    @SuppressWarnings("unchecked")
-    Aggregator<Double, Double> secondDelegate =
-        mock(Aggregator.class, "secondDelegate");
-
-    aggregator.setDelegate(aggregator);
-    aggregator.setDelegate(secondDelegate);
-
-    aggregator.addValue(2.25);
-
-    verify(secondDelegate).addValue(2.25);
-    verify(delegate, never()).addValue(anyLong());
-  }
-
-  @Test
-  public void testGetNameReturnsName() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-  }
-
-  @Test
-  public void testGetCombineFnReturnsCombineFn() {
-    String name = "agg";
-    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    DelegatingAggregator<Double, Double> aggregator =
-        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> CombineFn<T, ?, T> mockCombineFn(
-      @SuppressWarnings("unused") Class<T> clazz) {
-    return mock(CombineFn.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
deleted file mode 100644
index a1cd49d..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
-  @Mock
-  private Aggregator<Long, Long> agg;
-
-  private OldDoFn<Object, Object> fn;
-  private OldDoFn<Object, Object>.Context context;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-
-    // Need to be real objects to call the constructor, and to reference the
-    // outer instance of OldDoFn
-    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
-    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
-    fn = spy(noOpFn);
-    context = spy(noOpContext);
-  }
-
-  @Test
-  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-    Aggregator<Long, Long> delegateAggregator =
-        fn.createAggregator("test", combiner);
-
-    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-    context.setupDelegateAggregators();
-    delegateAggregator.addValue(1L);
-
-    verify(agg).addValue(1L);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index 425de07..d6838e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -19,14 +19,11 @@ package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -50,19 +47,6 @@ public class OldDoFnTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testCreateAggregatorWithCombinerSucceeds() {
-    String name = "testAggregator";
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @Test
   public void testCreateAggregatorWithNullNameThrowsException() {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("name cannot be null");
@@ -114,22 +98,6 @@ public class OldDoFnTest implements Serializable {
   }
 
   @Test
-  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
-    String nameOne = "testAggregator";
-    String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
-    Aggregator<Double, Double> aggregatorOne =
-        doFn.createAggregator(nameOne, combiner);
-    Aggregator<Double, Double> aggregatorTwo =
-        doFn.createAggregator(nameTwo, combiner);
-
-    assertNotEquals(aggregatorOne, aggregatorTwo);
-  }
-
-  @Test
   public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
     OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 45a04a7..77ec68f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 9cab5e8..59315a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -19,13 +19,8 @@ package org.apache.beam.runners.dataflow;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
@@ -38,36 +33,22 @@ import static org.mockito.Mockito.when;
 import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics;
 import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
 import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSetMultimap;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.net.SocketTimeoutException;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.values.PInput;
@@ -367,323 +348,6 @@ public class DataflowPipelineJobTest {
         DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
   }
 
-  @Test
-  public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
-      throws IOException, AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
-      throws IOException, AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    jobMetrics.setMetrics(null);
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate update = new MetricUpdate();
-    long stepValue = 1234L;
-    update.setScalar(new BigDecimal(stepValue));
-
-    MetricStructuredName structuredName = new MetricStructuredName();
-    structuredName.setName(aggregatorName);
-    structuredName.setContext(ImmutableMap.of("step", stepName));
-    update.setName(structuredName);
-
-    jobMetrics.setMetrics(ImmutableList.of(update));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
-    assertThat(values.getValuesAtSteps().size(), equalTo(1));
-    assertThat(values.getValues(), contains(stepValue));
-    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue)));
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-
-    Pipeline p = Pipeline.create(options);
-
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p);
-
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
-    String otherStepName = "s88";
-    String otherFullName = "Spam/Ham/Eggs";
-    AppliedPTransform<?, ?, ?> otherAppliedTransform =
-        appliedPTransform(otherFullName, otherTransform, p);
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
-            aggregator, pTransform, aggregator, otherTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
-            appliedTransform, stepName, otherAppliedTransform, otherStepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate updateOne = new MetricUpdate();
-    long stepValue = 1234L;
-    updateOne.setScalar(new BigDecimal(stepValue));
-
-    MetricStructuredName structuredNameOne = new MetricStructuredName();
-    structuredNameOne.setName(aggregatorName);
-    structuredNameOne.setContext(ImmutableMap.of("step", stepName));
-    updateOne.setName(structuredNameOne);
-
-    MetricUpdate updateTwo = new MetricUpdate();
-    long stepValueTwo = 1024L;
-    updateTwo.setScalar(new BigDecimal(stepValueTwo));
-
-    MetricStructuredName structuredNameTwo = new MetricStructuredName();
-    structuredNameTwo.setName(aggregatorName);
-    structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
-    updateTwo.setName(structuredNameTwo);
-
-    jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
-    assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo));
-    assertThat(values.getValuesAtSteps().size(), equalTo(2));
-    assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo));
-    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo)));
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    JobMetrics jobMetrics = new JobMetrics();
-    when(getMetrics.execute()).thenReturn(jobMetrics);
-
-    MetricUpdate ignoredUpdate = new MetricUpdate();
-    ignoredUpdate.setScalar(null);
-
-    MetricStructuredName ignoredName = new MetricStructuredName();
-    ignoredName.setName("ignoredAggregator.elementCount.out0");
-    ignoredName.setContext(null);
-    ignoredUpdate.setName(ignoredName);
-
-    jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
-    assertThat(values.getValuesAtSteps().entrySet(), empty());
-    assertThat(values.getValues(), empty());
-  }
-
-  @Test
-  public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
-      throws AggregatorRetrievalException {
-    Aggregator<?, ?> aggregator = mock(Aggregator.class);
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
-
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("not used in this pipeline");
-    job.getAggregatorValues(aggregator);
-  }
-
-  @Test
-  public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
-      throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
-    String aggregatorName = "agg";
-    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-    @SuppressWarnings("unchecked")
-    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
-    String stepName = "s1";
-    String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
-    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
-        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
-        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
-    GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
-    IOException cause = new IOException();
-    when(getMetrics.execute()).thenThrow(cause);
-
-    Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
-    Job modelJob = new Job();
-    when(getState.execute()).thenReturn(modelJob);
-    modelJob.setCurrentState(State.RUNNING.toString());
-
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
-    thrown.expect(AggregatorRetrievalException.class);
-    thrown.expectCause(is(cause));
-    thrown.expectMessage(aggregator.toString());
-    thrown.expectMessage("when retrieving Aggregator values for");
-    job.getAggregatorValues(aggregator);
-  }
-
-  private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> {
-    private final CombineFn<InT, ?, OutT> combineFn;
-    private final String name;
-
-    public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
-      this.combineFn = combineFn;
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(InT value) {
-      throw new AssertionError();
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<InT, ?, OutT> getCombineFn() {
-      return combineFn;
-    }
-  }
-
   private AppliedPTransform<?, ?, ?> appliedPTransform(
       String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
     PInput input = mock(PInput.class);
@@ -696,7 +360,6 @@ public class DataflowPipelineJobTest {
         p);
   }
 
-
   private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
 
     private long fastNanoTime;

http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index da14ee2..32caa9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -19,18 +19,18 @@ package org.apache.beam.runners.spark.examples;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -44,14 +44,13 @@ public class WordCount {
    * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
    * pipeline.
    */
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc();
       }
 
       // Split the line into words.


[3/4] beam git commit: Removing Aggregators from PipelineResults and subclasses

Posted by bc...@apache.org.
Removing Aggregators from PipelineResults and subclasses


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f26db8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f26db8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f26db8c

Branch: refs/heads/master
Commit: 6f26db8c479c9543003fd3bb8406117e25c4fed0
Parents: 1fe11d1
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:06:15 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/ApexRunnerResult.java     |  9 -----
 .../beam/runners/direct/DirectRunner.java       | 34 ------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 36 --------------------
 .../flink/FlinkDetachedRunnerResult.java        | 11 ------
 .../beam/runners/flink/FlinkRunnerResult.java   | 21 ------------
 .../runners/dataflow/DataflowPipelineJob.java   | 27 ---------------
 .../beam/runners/spark/SparkPipelineResult.java |  7 ----
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../org/apache/beam/sdk/PipelineResult.java     | 12 -------
 9 files changed, 1 insertion(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 8548194..41fdb75 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -23,12 +23,9 @@ import java.io.IOException;
 
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.ShutdownMode;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -50,12 +47,6 @@ public class ApexRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    return null;
-  }
-
-  @Override
   public State cancel() throws IOException {
     apexApp.shutdown(ShutdownMode.KILL);
     state = State.CANCELLED;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 77ec68f..db2d252 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -45,7 +44,6 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -375,38 +373,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     }
 
     @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-        throws AggregatorRetrievalException {
-      AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
-      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      final Map<String, T> stepValues = new HashMap<>();
-      if (steps != null) {
-        for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
-          if (steps.contains(transform.getTransform())) {
-            T aggregate = aggregators
-                .getAggregate(evaluationContext.getStepName(transform), aggregator.getName());
-            if (aggregate != null) {
-              stepValues.put(transform.getFullName(), aggregate);
-            }
-          }
-        }
-      }
-      return new AggregatorValues<T>() {
-        @Override
-        public Map<String, T> getValuesAtSteps() {
-          return stepValues;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .add("stepValues", stepValues)
-              .toString();
-        }
-      };
-    }
-
-    @Override
     public MetricResults metrics() {
       return evaluationContext.getMetrics();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index c55f84a..51ae12a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
-import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -48,7 +47,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -60,7 +58,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -69,13 +66,9 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -530,35 +523,6 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  @Test
-  public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
-    Pipeline p = getPipeline();
-    IdentityDoFn identityDoFn = new IdentityDoFn();
-    p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3")))
-        .apply(ParDo.of(identityDoFn));
-    PipelineResult pipelineResult = p.run();
-    pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
-  }
-
-  private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
-    private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs());
-    private static final String STATE_ID = "state";
-    @StateId(STATE_ID)
-    private static final StateSpec<Object, ValueState<String>> stateSpec =
-        StateSpecs.value(StringUtf8Coder.of());
-
-    @ProcessElement
-    public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String> state){
-      state.write("state content");
-      counter.addValue(1L);
-      context.output(context.element().getValue());
-    }
-
-    public Aggregator<Long, Long> getCounter() {
-      return counter;
-    }
-  }
-
   private static class LongNoDecodeCoder extends CustomCoder<Long> {
     @Override
     public void encode(

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
index bf4395f..b4d4b08 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -19,11 +19,8 @@ package org.apache.beam.runners.flink;
 
 import java.io.IOException;
 
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 
@@ -41,14 +38,6 @@ public class FlinkDetachedRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    throw new AggregatorRetrievalException(
-        "Accumulators can't be retrieved for detached Job executions.",
-        new UnsupportedOperationException());
-  }
-
-  @Override
   public MetricResults metrics() {
     throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 0f2462d..dfc1d8e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -21,11 +21,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -52,24 +49,6 @@ public class FlinkRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    // TODO provide a list of all accumulator step values
-    Object value = aggregators.get(aggregator.getName());
-    if (value != null) {
-      return new AggregatorValues<T>() {
-        @Override
-        public Map<String, T> getValuesAtSteps() {
-          return (Map<String, T>) aggregators;
-        }
-      };
-    } else {
-      throw new AggregatorRetrievalException("Accumulator results not found.",
-          new RuntimeException("Accumulator does not exist."));
-    }
-  }
-
-  @Override
   public String toString() {
     return "FlinkRunnerResult{"
         + "aggregators=" + aggregators

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 7cb0f0e..0399ada 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -29,7 +29,6 @@ import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.List;
@@ -41,8 +40,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -488,30 +485,6 @@ public class DataflowPipelineJob implements PipelineResult {
   }
 
   @Override
-  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
-      throws AggregatorRetrievalException {
-    try {
-      final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
-      return new AggregatorValues<OutputT>() {
-        @Override
-        public Map<String, OutputT> getValuesAtSteps() {
-          return stepValues;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .add("stepValues", stepValues)
-              .toString();
-        }
-      };
-    } catch (IOException e) {
-      throw new AggregatorRetrievalException(
-          "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
-    }
-  }
-
-  @Override
   public MetricResults metrics() {
     return dataflowMetrics;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index d2c5c8e..1110a55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -27,11 +27,9 @@ import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -84,11 +82,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) {
-    return SparkAggregators.valueOf(aggregator);
-  }
-
-  @Override
   public PipelineResult.State getState() {
     return state;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 32caa9a..de5ae48 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -21,10 +21,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index 35f11eb..7e78e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -64,17 +63,6 @@ public interface PipelineResult {
    */
   State waitUntilFinish();
 
-  /**
-   * Retrieves the current value of the provided {@link Aggregator}.
-   *
-   * @param aggregator the {@link Aggregator} to retrieve values for.
-   * @return the current values of the {@link Aggregator},
-   * which may be empty if there are no values yet.
-   * @throws AggregatorRetrievalException if the {@link Aggregator} values could not be retrieved.
-   */
-  <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException;
-
   // TODO: method to retrieve error messages.
 
   /** Named constants for common values for the job state. */


[4/4] beam git commit: Removing Aggregators from Examples

Posted by bc...@apache.org.
Removing Aggregators from Examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/904b4130
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/904b4130
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/904b4130

Branch: refs/heads/master
Commit: 904b4130b8ba4e9edc0c776da99cbe46d00d9442
Parents: 1d97bdf
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 12:50:38 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       | 20 +++++++++-----------
 .../org/apache/beam/examples/WordCount.java     |  9 ++++-----
 .../cookbook/CombinePerKeyExamples.java         |  9 ++++-----
 .../beam/examples/complete/game/GameStats.java  |  8 ++++----
 .../beam/examples/complete/game/UserScore.java  |  8 ++++----
 5 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 031f317..4c82f46 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -22,14 +22,14 @@ import java.util.List;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
  * <p>New Concepts:
  * <pre>
  *   1. Logging using SLF4J, even in a distributed environment
- *   2. Creating a custom aggregator (runners have varying levels of support)
+ *   2. Creating a custom metric (runners have varying levels of support)
  *   3. Testing your Pipeline via PAssert
  * </pre>
  *
@@ -90,14 +90,12 @@ public class DebuggingWordCount {
     }
 
     /**
-     * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
-     * runner provides varying levels of support for aggregators, and may expose them
+     * Concept #2: A custom metric can track values in your pipeline as it runs. Each
+     * runner provides varying levels of support for metrics, and may expose them
      * in a dashboard, etc.
      */
-    private final Aggregator<Long, Long> matchedWords =
-        createAggregator("matchedWords", Sum.ofLongs());
-    private final Aggregator<Long, Long> unmatchedWords =
-        createAggregator("unmatchedWords", Sum.ofLongs());
+    private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
+    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
@@ -105,14 +103,14 @@ public class DebuggingWordCount {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
         // these log lines will appear only if the log level is set to "DEBUG" or lower.
         LOG.debug("Matched: " + c.element().getKey());
-        matchedWords.addValue(1L);
+        matchedWords.inc();
         c.output(c.element());
       } else {
         // Log at the "TRACE" level each element that is not matched. Different log levels
         // can be used to control the verbosity of logging providing an effective mechanism
         // to filter less important information.
         LOG.trace("Did not match: " + c.element().getKey());
-        unmatchedWords.addValue(1L);
+        unmatchedWords.inc();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 7e21d47..0c786bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -19,19 +19,19 @@ package org.apache.beam.examples;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -86,13 +86,12 @@ public class WordCount {
    * to a ParDo in the pipeline.
    */
   static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc();
       }
 
       // Split the line into words.

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 8d13b90..39553a5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -24,18 +24,18 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -79,8 +79,7 @@ public class CombinePerKeyExamples {
    * outputs word, play_name.
    */
   static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
-    private final Aggregator<Long, Long> smallerWords =
-        createAggregator("smallerWords", Sum.ofLongs());
+    private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
 
     @ProcessElement
     public void processElement(ProcessContext c){
@@ -92,7 +91,7 @@ public class CombinePerKeyExamples {
       } else {
         // Track how many smaller words we're not including. This information will be
         // visible in the Monitoring UI.
-        smallerWords.addValue(1L);
+        smallerWords.inc();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 9c79fad..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -26,10 +26,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -126,8 +127,7 @@ public class GameStats extends LeaderBoard {
           .apply("ProcessAndFilter", ParDo
               // use the derived mean total score as a side input
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-                private final Aggregator<Long, Long> numSpammerUsers =
-                  createAggregator("SpammerUsers", Sum.ofLongs());
+                private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   Integer score = c.element().getValue();
@@ -135,7 +135,7 @@ public class GameStats extends LeaderBoard {
                   if (score > (gmc * SCORE_WEIGHT)) {
                     LOG.info("user " + c.element().getKey() + " spammer score " + score
                         + " with mean " + gmc);
-                    numSpammerUsers.addValue(1L);
+                    numSpammerUsers.inc();
                     c.output(c.element());
                   }
                 }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index b4b023f..0adaabc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -25,12 +25,13 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -125,8 +126,7 @@ public class UserScore {
 
     // Log and count parse errors.
     private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
-    private final Aggregator<Long, Long> numParseErrors =
-        createAggregator("ParseErrors", Sum.ofLongs());
+    private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
@@ -139,7 +139,7 @@ public class UserScore {
         GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
         c.output(gInfo);
       } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
-        numParseErrors.addValue(1L);
+        numParseErrors.inc();
         LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
       }
     }