You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:14 UTC
[1/4] beam git commit: Closes #2184
Repository: beam
Updated Branches:
refs/heads/master 1d97bdffc -> 4fabaef80
Closes #2184
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fabaef8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fabaef8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fabaef8
Branch: refs/heads/master
Commit: 4fabaef806bc8842016c546fa955be8b28c921aa
Parents: 1d97bdf 6f26db8
Author: bchambers <bc...@google.com>
Authored: Tue Apr 25 09:20:26 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 20 +-
.../org/apache/beam/examples/WordCount.java | 9 +-
.../cookbook/CombinePerKeyExamples.java | 9 +-
.../beam/examples/complete/game/GameStats.java | 8 +-
.../beam/examples/complete/game/UserScore.java | 8 +-
.../beam/runners/apex/ApexRunnerResult.java | 9 -
.../runners/apex/examples/WordCountTest.java | 9 +-
.../core/DoFnDelegatingAggregatorTest.java | 144 --------
.../beam/runners/core/OldDoFnContextTest.java | 72 ----
.../apache/beam/runners/core/OldDoFnTest.java | 32 --
.../beam/runners/direct/DirectRunner.java | 37 --
.../beam/runners/direct/DirectRunnerTest.java | 36 --
.../flink/FlinkDetachedRunnerResult.java | 11 -
.../beam/runners/flink/FlinkRunnerResult.java | 21 --
.../runners/dataflow/DataflowPipelineJob.java | 27 --
.../dataflow/DataflowPipelineJobTest.java | 337 -------------------
.../beam/runners/spark/SparkPipelineResult.java | 7 -
.../beam/runners/spark/examples/WordCount.java | 11 +-
.../org/apache/beam/sdk/PipelineResult.java | 12 -
19 files changed, 34 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: Removing Aggregators from runner-specific
examples and tests
Posted by bc...@apache.org.
Removing Aggregators from runner-specific examples and tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fe11d12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fe11d12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fe11d12
Branch: refs/heads/master
Commit: 1fe11d12404b53d978e70ee2c7c37582dcad10c9
Parents: 904b413
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:03:27 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 9 +-
.../core/DoFnDelegatingAggregatorTest.java | 144 --------
.../beam/runners/core/OldDoFnContextTest.java | 72 ----
.../apache/beam/runners/core/OldDoFnTest.java | 32 --
.../beam/runners/direct/DirectRunner.java | 3 -
.../dataflow/DataflowPipelineJobTest.java | 337 -------------------
.../beam/runners/spark/examples/WordCount.java | 13 +-
8 files changed, 11 insertions(+), 601 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 4c82f46..e6e3a92 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,7 +95,7 @@ public class DebuggingWordCount {
* in a dashboard, etc.
*/
private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
- private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
+ private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index a1713ac..b980715 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -30,15 +30,15 @@ import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -67,13 +67,12 @@ public class WordCountTest {
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ private final Counter emptyLines = Metrics.counter("main", "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc(1);
}
// Split the line into words.
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index b44e8a4..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
- @Mock
- private Aggregator<Long, Long> delegate;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testAddValueWithoutDelegateThrowsException() {
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("cannot be called");
- thrown.expectMessage("DoFn");
-
- aggregator.addValue(21.2);
- }
-
- @Test
- public void testSetDelegateThenAddValueCallsDelegate() {
- String name = "agg";
- CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Long, Long> aggregator =
- (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
- aggregator.setDelegate(delegate);
-
- aggregator.addValue(12L);
-
- verify(delegate).addValue(12L);
- }
-
- @Test
- public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- @SuppressWarnings("unchecked")
- Aggregator<Double, Double> secondDelegate =
- mock(Aggregator.class, "secondDelegate");
-
- aggregator.setDelegate(aggregator);
- aggregator.setDelegate(secondDelegate);
-
- aggregator.addValue(2.25);
-
- verify(secondDelegate).addValue(2.25);
- verify(delegate, never()).addValue(anyLong());
- }
-
- @Test
- public void testGetNameReturnsName() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- }
-
- @Test
- public void testGetCombineFnReturnsCombineFn() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @SuppressWarnings("unchecked")
- private static <T> CombineFn<T, ?, T> mockCombineFn(
- @SuppressWarnings("unused") Class<T> clazz) {
- return mock(CombineFn.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
deleted file mode 100644
index a1cd49d..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
- @Mock
- private Aggregator<Long, Long> agg;
-
- private OldDoFn<Object, Object> fn;
- private OldDoFn<Object, Object>.Context context;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
-
- // Need to be real objects to call the constructor, and to reference the
- // outer instance of OldDoFn
- NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
- OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
- fn = spy(noOpFn);
- context = spy(noOpContext);
- }
-
- @Test
- public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index 425de07..d6838e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -19,14 +19,11 @@ package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -50,19 +47,6 @@ public class OldDoFnTest implements Serializable {
public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testCreateAggregatorWithCombinerSucceeds() {
- String name = "testAggregator";
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @Test
public void testCreateAggregatorWithNullNameThrowsException() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("name cannot be null");
@@ -114,22 +98,6 @@ public class OldDoFnTest implements Serializable {
}
@Test
- public void testCreateAggregatorsWithDifferentNamesSucceeds() {
- String nameOne = "testAggregator";
- String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Double, Double> aggregatorOne =
- doFn.createAggregator(nameOne, combiner);
- Aggregator<Double, Double> aggregatorTwo =
- doFn.createAggregator(nameTwo, combiner);
-
- assertNotEquals(aggregatorOne, aggregatorTwo);
- }
-
- @Test
public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
OldDoFn<String, String> fn = new OldDoFn<String, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 45a04a7..77ec68f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 9cab5e8..59315a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -19,13 +19,8 @@ package org.apache.beam.runners.dataflow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
@@ -38,36 +33,22 @@ import static org.mockito.Mockito.when;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSetMultimap;
import java.io.IOException;
-import java.math.BigDecimal;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.values.PInput;
@@ -367,323 +348,6 @@ public class DataflowPipelineJobTest {
DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
}
- @Test
- public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
- throws IOException, AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
- throws IOException, AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- jobMetrics.setMetrics(null);
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate update = new MetricUpdate();
- long stepValue = 1234L;
- update.setScalar(new BigDecimal(stepValue));
-
- MetricStructuredName structuredName = new MetricStructuredName();
- structuredName.setName(aggregatorName);
- structuredName.setContext(ImmutableMap.of("step", stepName));
- update.setName(structuredName);
-
- jobMetrics.setMetrics(ImmutableList.of(update));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
- assertThat(values.getValuesAtSteps().size(), equalTo(1));
- assertThat(values.getValues(), contains(stepValue));
- assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue)));
- }
-
- @Test
- public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-
- Pipeline p = Pipeline.create(options);
-
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p);
-
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
- String otherStepName = "s88";
- String otherFullName = "Spam/Ham/Eggs";
- AppliedPTransform<?, ?, ?> otherAppliedTransform =
- appliedPTransform(otherFullName, otherTransform, p);
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
- aggregator, pTransform, aggregator, otherTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
- appliedTransform, stepName, otherAppliedTransform, otherStepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate updateOne = new MetricUpdate();
- long stepValue = 1234L;
- updateOne.setScalar(new BigDecimal(stepValue));
-
- MetricStructuredName structuredNameOne = new MetricStructuredName();
- structuredNameOne.setName(aggregatorName);
- structuredNameOne.setContext(ImmutableMap.of("step", stepName));
- updateOne.setName(structuredNameOne);
-
- MetricUpdate updateTwo = new MetricUpdate();
- long stepValueTwo = 1024L;
- updateTwo.setScalar(new BigDecimal(stepValueTwo));
-
- MetricStructuredName structuredNameTwo = new MetricStructuredName();
- structuredNameTwo.setName(aggregatorName);
- structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
- updateTwo.setName(structuredNameTwo);
-
- jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
- assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo));
- assertThat(values.getValuesAtSteps().size(), equalTo(2));
- assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo));
- assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo)));
- }
-
- @Test
- public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate ignoredUpdate = new MetricUpdate();
- ignoredUpdate.setScalar(null);
-
- MetricStructuredName ignoredName = new MetricStructuredName();
- ignoredName.setName("ignoredAggregator.elementCount.out0");
- ignoredName.setContext(null);
- ignoredUpdate.setName(ignoredName);
-
- jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps().entrySet(), empty());
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
- throws AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("not used in this pipeline");
- job.getAggregatorValues(aggregator);
- }
-
- @Test
- public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- IOException cause = new IOException();
- when(getMetrics.execute()).thenThrow(cause);
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- thrown.expect(AggregatorRetrievalException.class);
- thrown.expectCause(is(cause));
- thrown.expectMessage(aggregator.toString());
- thrown.expectMessage("when retrieving Aggregator values for");
- job.getAggregatorValues(aggregator);
- }
-
- private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> {
- private final CombineFn<InT, ?, OutT> combineFn;
- private final String name;
-
- public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
- this.combineFn = combineFn;
- this.name = name;
- }
-
- @Override
- public void addValue(InT value) {
- throw new AssertionError();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<InT, ?, OutT> getCombineFn() {
- return combineFn;
- }
- }
-
private AppliedPTransform<?, ?, ?> appliedPTransform(
String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
PInput input = mock(PInput.class);
@@ -696,7 +360,6 @@ public class DataflowPipelineJobTest {
p);
}
-
private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
private long fastNanoTime;
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index da14ee2..32caa9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -19,18 +19,18 @@ package org.apache.beam.runners.spark.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -44,14 +44,13 @@ public class WordCount {
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ public static class ExtractWordsFn extends DoFn<String, String> {
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc();
}
// Split the line into words.
[3/4] beam git commit: Removing Aggregators from PipelineResults and
subclasses
Posted by bc...@apache.org.
Removing Aggregators from PipelineResults and subclasses
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f26db8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f26db8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f26db8c
Branch: refs/heads/master
Commit: 6f26db8c479c9543003fd3bb8406117e25c4fed0
Parents: 1fe11d1
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:06:15 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/runners/apex/ApexRunnerResult.java | 9 -----
.../beam/runners/direct/DirectRunner.java | 34 ------------------
.../beam/runners/direct/DirectRunnerTest.java | 36 --------------------
.../flink/FlinkDetachedRunnerResult.java | 11 ------
.../beam/runners/flink/FlinkRunnerResult.java | 21 ------------
.../runners/dataflow/DataflowPipelineJob.java | 27 ---------------
.../beam/runners/spark/SparkPipelineResult.java | 7 ----
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../org/apache/beam/sdk/PipelineResult.java | 12 -------
9 files changed, 1 insertion(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 8548194..41fdb75 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -23,12 +23,9 @@ import java.io.IOException;
import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.ShutdownMode;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
/**
@@ -50,12 +47,6 @@ public class ApexRunnerResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- return null;
- }
-
- @Override
public State cancel() throws IOException {
apexApp.shutdown(ShutdownMode.KILL);
state = State.CANCELLED;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 77ec68f..db2d252 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -45,7 +44,6 @@ import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -375,38 +373,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
- Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
- final Map<String, T> stepValues = new HashMap<>();
- if (steps != null) {
- for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
- if (steps.contains(transform.getTransform())) {
- T aggregate = aggregators
- .getAggregate(evaluationContext.getStepName(transform), aggregator.getName());
- if (aggregate != null) {
- stepValues.put(transform.getFullName(), aggregate);
- }
- }
- }
- }
- return new AggregatorValues<T>() {
- @Override
- public Map<String, T> getValuesAtSteps() {
- return stepValues;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("stepValues", stepValues)
- .toString();
- }
- };
- }
-
- @Override
public MetricResults metrics() {
return evaluationContext.getMetrics();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index c55f84a..51ae12a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
-import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
@@ -48,7 +47,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -60,7 +58,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -69,13 +66,9 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -530,35 +523,6 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
- @Test
- public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
- Pipeline p = getPipeline();
- IdentityDoFn identityDoFn = new IdentityDoFn();
- p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3")))
- .apply(ParDo.of(identityDoFn));
- PipelineResult pipelineResult = p.run();
- pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
- }
-
- private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
- private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs());
- private static final String STATE_ID = "state";
- @StateId(STATE_ID)
- private static final StateSpec<Object, ValueState<String>> stateSpec =
- StateSpecs.value(StringUtf8Coder.of());
-
- @ProcessElement
- public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String> state){
- state.write("state content");
- counter.addValue(1L);
- context.output(context.element().getValue());
- }
-
- public Aggregator<Long, Long> getCounter() {
- return counter;
- }
- }
-
private static class LongNoDecodeCoder extends CustomCoder<Long> {
@Override
public void encode(
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
index bf4395f..b4d4b08 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -19,11 +19,8 @@ package org.apache.beam.runners.flink;
import java.io.IOException;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
@@ -41,14 +38,6 @@ public class FlinkDetachedRunnerResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- throw new AggregatorRetrievalException(
- "Accumulators can't be retrieved for detached Job executions.",
- new UnsupportedOperationException());
- }
-
- @Override
public MetricResults metrics() {
throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 0f2462d..dfc1d8e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -21,11 +21,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
/**
@@ -52,24 +49,6 @@ public class FlinkRunnerResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- // TODO provide a list of all accumulator step values
- Object value = aggregators.get(aggregator.getName());
- if (value != null) {
- return new AggregatorValues<T>() {
- @Override
- public Map<String, T> getValuesAtSteps() {
- return (Map<String, T>) aggregators;
- }
- };
- } else {
- throw new AggregatorRetrievalException("Accumulator results not found.",
- new RuntimeException("Accumulator does not exist."));
- }
- }
-
- @Override
public String toString() {
return "FlinkRunnerResult{"
+ "aggregators=" + aggregators
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 7cb0f0e..0399ada 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -29,7 +29,6 @@ import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
@@ -41,8 +40,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -488,30 +485,6 @@ public class DataflowPipelineJob implements PipelineResult {
}
@Override
- public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
- throws AggregatorRetrievalException {
- try {
- final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
- return new AggregatorValues<OutputT>() {
- @Override
- public Map<String, OutputT> getValuesAtSteps() {
- return stepValues;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("stepValues", stepValues)
- .toString();
- }
- };
- } catch (IOException e) {
- throw new AggregatorRetrievalException(
- "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
- }
- }
-
- @Override
public MetricResults metrics() {
return dataflowMetrics;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index d2c5c8e..1110a55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -27,11 +27,9 @@ import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricResults;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
@@ -84,11 +82,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) {
- return SparkAggregators.valueOf(aggregator);
- }
-
- @Override
public PipelineResult.State getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 32caa9a..de5ae48 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -21,10 +21,10 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index 35f11eb..7e78e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
/**
@@ -64,17 +63,6 @@ public interface PipelineResult {
*/
State waitUntilFinish();
- /**
- * Retrieves the current value of the provided {@link Aggregator}.
- *
- * @param aggregator the {@link Aggregator} to retrieve values for.
- * @return the current values of the {@link Aggregator},
- * which may be empty if there are no values yet.
- * @throws AggregatorRetrievalException if the {@link Aggregator} values could not be retrieved.
- */
- <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException;
-
// TODO: method to retrieve error messages.
/** Named constants for common values for the job state. */
[4/4] beam git commit: Removing Aggregators from Examples
Posted by bc...@apache.org.
Removing Aggregators from Examples
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/904b4130
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/904b4130
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/904b4130
Branch: refs/heads/master
Commit: 904b4130b8ba4e9edc0c776da99cbe46d00d9442
Parents: 1d97bdf
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 12:50:38 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 20 +++++++++-----------
.../org/apache/beam/examples/WordCount.java | 9 ++++-----
.../cookbook/CombinePerKeyExamples.java | 9 ++++-----
.../beam/examples/complete/game/GameStats.java | 8 ++++----
.../beam/examples/complete/game/UserScore.java | 8 ++++----
5 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 031f317..4c82f46 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -22,14 +22,14 @@ import java.util.List;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
* <p>New Concepts:
* <pre>
* 1. Logging using SLF4J, even in a distributed environment
- * 2. Creating a custom aggregator (runners have varying levels of support)
+ * 2. Creating a custom metric (runners have varying levels of support)
* 3. Testing your Pipeline via PAssert
* </pre>
*
@@ -90,14 +90,12 @@ public class DebuggingWordCount {
}
/**
- * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
- * runner provides varying levels of support for aggregators, and may expose them
+ * Concept #2: A custom metric can track values in your pipeline as it runs. Each
+ * runner provides varying levels of support for metrics, and may expose them
* in a dashboard, etc.
*/
- private final Aggregator<Long, Long> matchedWords =
- createAggregator("matchedWords", Sum.ofLongs());
- private final Aggregator<Long, Long> unmatchedWords =
- createAggregator("unmatchedWords", Sum.ofLongs());
+ private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
+ private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
@ProcessElement
public void processElement(ProcessContext c) {
@@ -105,14 +103,14 @@ public class DebuggingWordCount {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: " + c.element().getKey());
- matchedWords.addValue(1L);
+ matchedWords.inc();
c.output(c.element());
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
LOG.trace("Did not match: " + c.element().getKey());
- unmatchedWords.addValue(1L);
+ unmatchedWords.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 7e21d47..0c786bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -19,19 +19,19 @@ package org.apache.beam.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -86,13 +86,12 @@ public class WordCount {
* to a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc();
}
// Split the line into words.
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 8d13b90..39553a5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -24,18 +24,18 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -79,8 +79,7 @@ public class CombinePerKeyExamples {
* outputs word, play_name.
*/
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
- private final Aggregator<Long, Long> smallerWords =
- createAggregator("smallerWords", Sum.ofLongs());
+ private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
@ProcessElement
public void processElement(ProcessContext c){
@@ -92,7 +91,7 @@ public class CombinePerKeyExamples {
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.
- smallerWords.addValue(1L);
+ smallerWords.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 9c79fad..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -26,10 +26,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -126,8 +127,7 @@ public class GameStats extends LeaderBoard {
.apply("ProcessAndFilter", ParDo
// use the derived mean total score as a side input
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
- private final Aggregator<Long, Long> numSpammerUsers =
- createAggregator("SpammerUsers", Sum.ofLongs());
+ private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
@@ -135,7 +135,7 @@ public class GameStats extends LeaderBoard {
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info("user " + c.element().getKey() + " spammer score " + score
+ " with mean " + gmc);
- numSpammerUsers.addValue(1L);
+ numSpammerUsers.inc();
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index b4b023f..0adaabc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -25,12 +25,13 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -125,8 +126,7 @@ public class UserScore {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
- private final Aggregator<Long, Long> numParseErrors =
- createAggregator("ParseErrors", Sum.ofLongs());
+ private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
@ProcessElement
public void processElement(ProcessContext c) {
@@ -139,7 +139,7 @@ public class UserScore {
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
- numParseErrors.addValue(1L);
+ numParseErrors.inc();
LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
}
}