You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:15 UTC
[2/4] beam git commit: Removing Aggregators from runner-specific
examples and tests
Removing Aggregators from runner-specific examples and tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fe11d12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fe11d12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fe11d12
Branch: refs/heads/master
Commit: 1fe11d12404b53d978e70ee2c7c37582dcad10c9
Parents: 904b413
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:03:27 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 9 +-
.../core/DoFnDelegatingAggregatorTest.java | 144 --------
.../beam/runners/core/OldDoFnContextTest.java | 72 ----
.../apache/beam/runners/core/OldDoFnTest.java | 32 --
.../beam/runners/direct/DirectRunner.java | 3 -
.../dataflow/DataflowPipelineJobTest.java | 337 -------------------
.../beam/runners/spark/examples/WordCount.java | 13 +-
8 files changed, 11 insertions(+), 601 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 4c82f46..e6e3a92 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,7 +95,7 @@ public class DebuggingWordCount {
* in a dashboard, etc.
*/
private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
- private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
+ private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index a1713ac..b980715 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -30,15 +30,15 @@ import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -67,13 +67,12 @@ public class WordCountTest {
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ private final Counter emptyLines = Metrics.counter("main", "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc(1);
}
// Split the line into words.
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index b44e8a4..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DelegatingAggregator;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
- @Mock
- private Aggregator<Long, Long> delegate;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testAddValueWithoutDelegateThrowsException() {
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("cannot be called");
- thrown.expectMessage("DoFn");
-
- aggregator.addValue(21.2);
- }
-
- @Test
- public void testSetDelegateThenAddValueCallsDelegate() {
- String name = "agg";
- CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Long, Long> aggregator =
- (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
- aggregator.setDelegate(delegate);
-
- aggregator.addValue(12L);
-
- verify(delegate).addValue(12L);
- }
-
- @Test
- public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- @SuppressWarnings("unchecked")
- Aggregator<Double, Double> secondDelegate =
- mock(Aggregator.class, "secondDelegate");
-
- aggregator.setDelegate(aggregator);
- aggregator.setDelegate(secondDelegate);
-
- aggregator.addValue(2.25);
-
- verify(secondDelegate).addValue(2.25);
- verify(delegate, never()).addValue(anyLong());
- }
-
- @Test
- public void testGetNameReturnsName() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- }
-
- @Test
- public void testGetCombineFnReturnsCombineFn() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @SuppressWarnings("unchecked")
- private static <T> CombineFn<T, ?, T> mockCombineFn(
- @SuppressWarnings("unused") Class<T> clazz) {
- return mock(CombineFn.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
deleted file mode 100644
index a1cd49d..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
- @Mock
- private Aggregator<Long, Long> agg;
-
- private OldDoFn<Object, Object> fn;
- private OldDoFn<Object, Object>.Context context;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
-
- // Need to be real objects to call the constructor, and to reference the
- // outer instance of OldDoFn
- NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
- OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
- fn = spy(noOpFn);
- context = spy(noOpContext);
- }
-
- @Test
- public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
index 425de07..d6838e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -19,14 +19,11 @@ package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -50,19 +47,6 @@ public class OldDoFnTest implements Serializable {
public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testCreateAggregatorWithCombinerSucceeds() {
- String name = "testAggregator";
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @Test
public void testCreateAggregatorWithNullNameThrowsException() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("name cannot be null");
@@ -114,22 +98,6 @@ public class OldDoFnTest implements Serializable {
}
@Test
- public void testCreateAggregatorsWithDifferentNamesSucceeds() {
- String nameOne = "testAggregator";
- String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Double, Double> aggregatorOne =
- doFn.createAggregator(nameOne, combiner);
- Aggregator<Double, Double> aggregatorTwo =
- doFn.createAggregator(nameTwo, combiner);
-
- assertNotEquals(aggregatorOne, aggregatorTwo);
- }
-
- @Test
public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
OldDoFn<String, String> fn = new OldDoFn<String, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 45a04a7..77ec68f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 9cab5e8..59315a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -19,13 +19,8 @@ package org.apache.beam.runners.dataflow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
@@ -38,36 +33,22 @@ import static org.mockito.Mockito.when;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
-import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSetMultimap;
import java.io.IOException;
-import java.math.BigDecimal;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.values.PInput;
@@ -367,323 +348,6 @@ public class DataflowPipelineJobTest {
DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
}
- @Test
- public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
- throws IOException, AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
- throws IOException, AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- jobMetrics.setMetrics(null);
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<?> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate update = new MetricUpdate();
- long stepValue = 1234L;
- update.setScalar(new BigDecimal(stepValue));
-
- MetricStructuredName structuredName = new MetricStructuredName();
- structuredName.setName(aggregatorName);
- structuredName.setContext(ImmutableMap.of("step", stepName));
- update.setName(structuredName);
-
- jobMetrics.setMetrics(ImmutableList.of(update));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
- assertThat(values.getValuesAtSteps().size(), equalTo(1));
- assertThat(values.getValues(), contains(stepValue));
- assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue)));
- }
-
- @Test
- public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
-
- Pipeline p = Pipeline.create(options);
-
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p);
-
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
- String otherStepName = "s88";
- String otherFullName = "Spam/Ham/Eggs";
- AppliedPTransform<?, ?, ?> otherAppliedTransform =
- appliedPTransform(otherFullName, otherTransform, p);
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
- aggregator, pTransform, aggregator, otherTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
- appliedTransform, stepName, otherAppliedTransform, otherStepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate updateOne = new MetricUpdate();
- long stepValue = 1234L;
- updateOne.setScalar(new BigDecimal(stepValue));
-
- MetricStructuredName structuredNameOne = new MetricStructuredName();
- structuredNameOne.setName(aggregatorName);
- structuredNameOne.setContext(ImmutableMap.of("step", stepName));
- updateOne.setName(structuredNameOne);
-
- MetricUpdate updateTwo = new MetricUpdate();
- long stepValueTwo = 1024L;
- updateTwo.setScalar(new BigDecimal(stepValueTwo));
-
- MetricStructuredName structuredNameTwo = new MetricStructuredName();
- structuredNameTwo.setName(aggregatorName);
- structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
- updateTwo.setName(structuredNameTwo);
-
- jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
- assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo));
- assertThat(values.getValuesAtSteps().size(), equalTo(2));
- assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo));
- assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo)));
- }
-
- @Test
- public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- JobMetrics jobMetrics = new JobMetrics();
- when(getMetrics.execute()).thenReturn(jobMetrics);
-
- MetricUpdate ignoredUpdate = new MetricUpdate();
- ignoredUpdate.setScalar(null);
-
- MetricStructuredName ignoredName = new MetricStructuredName();
- ignoredName.setName("ignoredAggregator.elementCount.out0");
- ignoredName.setContext(null);
- ignoredUpdate.setName(ignoredName);
-
- jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
-
- assertThat(values.getValuesAtSteps().entrySet(), empty());
- assertThat(values.getValues(), empty());
- }
-
- @Test
- public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
- throws AggregatorRetrievalException {
- Aggregator<?, ?> aggregator = mock(Aggregator.class);
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
-
- DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("not used in this pipeline");
- job.getAggregatorValues(aggregator);
- }
-
- @Test
- public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
- throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
- String aggregatorName = "agg";
- Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
- @SuppressWarnings("unchecked")
- PTransform<PInput, POutput> pTransform = mock(PTransform.class);
- String stepName = "s1";
- String fullName = "Foo/Bar/Baz";
- AppliedPTransform<?, ?, ?> appliedTransform =
- appliedPTransform(fullName, pTransform, Pipeline.create(options));
-
- DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
- ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
- ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
-
- GetMetrics getMetrics = mock(GetMetrics.class);
- when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
- IOException cause = new IOException();
- when(getMetrics.execute()).thenThrow(cause);
-
- Get getState = mock(Get.class);
- when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
- Job modelJob = new Job();
- when(getState.execute()).thenReturn(modelJob);
- modelJob.setCurrentState(State.RUNNING.toString());
-
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
-
- thrown.expect(AggregatorRetrievalException.class);
- thrown.expectCause(is(cause));
- thrown.expectMessage(aggregator.toString());
- thrown.expectMessage("when retrieving Aggregator values for");
- job.getAggregatorValues(aggregator);
- }
-
- private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> {
- private final CombineFn<InT, ?, OutT> combineFn;
- private final String name;
-
- public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
- this.combineFn = combineFn;
- this.name = name;
- }
-
- @Override
- public void addValue(InT value) {
- throw new AssertionError();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<InT, ?, OutT> getCombineFn() {
- return combineFn;
- }
- }
-
private AppliedPTransform<?, ?, ?> appliedPTransform(
String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
PInput input = mock(PInput.class);
@@ -696,7 +360,6 @@ public class DataflowPipelineJobTest {
p);
}
-
private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
private long fastNanoTime;
http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index da14ee2..32caa9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -19,18 +19,18 @@ package org.apache.beam.runners.spark.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -44,14 +44,13 @@ public class WordCount {
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ public static class ExtractWordsFn extends DoFn<String, String> {
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc();
}
// Split the line into words.