You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/26 22:08:00 UTC
[2/2] beam git commit: Remove aggregators from Dataflow runner
Remove aggregators from Dataflow runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c17420d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c17420d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c17420d
Branch: refs/heads/master
Commit: 7c17420d9d2714bf7929a7cb3dee09787e7359a6
Parents: af8ead4
Author: Pablo <pa...@google.com>
Authored: Thu Apr 20 11:08:10 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Apr 26 14:41:52 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowAggregatorTransforms.java | 79 --------------
.../dataflow/DataflowMetricUpdateExtractor.java | 109 -------------------
.../beam/runners/dataflow/DataflowMetrics.java | 6 +-
.../runners/dataflow/DataflowPipelineJob.java | 41 ++-----
.../beam/runners/dataflow/DataflowRunner.java | 12 +-
.../dataflow/DataflowPipelineJobTest.java | 42 +++----
6 files changed, 32 insertions(+), 257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
deleted file mode 100755
index 0198cca..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-class DataflowAggregatorTransforms {
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
- private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
- private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
- public DataflowAggregatorTransforms(
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
- this.aggregatorTransforms = aggregatorTransforms;
- appliedStepNames = HashBiMap.create(transformStepNames);
-
- transformAppliedTransforms = HashMultimap.create();
- for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
- transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
- }
- }
-
- /**
- * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
- */
- public boolean contains(Aggregator<?, ?> aggregator) {
- return aggregatorTransforms.containsKey(aggregator);
- }
-
- /**
- * Gets the step names in which the {@link Aggregator} is used.
- */
- public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
- Collection<String> names = new HashSet<>();
- Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
- for (PTransform<?, ?> transform : transforms) {
- for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
- names.add(appliedStepNames.get(applied));
- }
- }
- return names;
- }
-
- /**
- * Gets the {@link PTransform} that was assigned the provided step name.
- */
- public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
- return appliedStepNames.inverse().get(stepName);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
deleted file mode 100755
index f725c46..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-final class DataflowMetricUpdateExtractor {
- private static final String STEP_NAME_CONTEXT_KEY = "step";
- private static final String IS_TENTATIVE_KEY = "tentative";
-
- private DataflowMetricUpdateExtractor() {
- // Do not instantiate.
- }
-
- /**
- * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
- * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
- * MetricUpdate MetricUpdates}.
- */
- public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
- DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
- Map<String, OutputT> results = new HashMap<>();
- if (metricUpdates == null) {
- return results;
- }
-
- String aggregatorName = aggregator.getName();
- Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
- for (MetricUpdate metricUpdate : metricUpdates) {
- MetricStructuredName metricStructuredName = metricUpdate.getName();
- Map<String, String> context = metricStructuredName.getContext();
- if (metricStructuredName.getName().equals(aggregatorName) && context != null
- && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
- AppliedPTransform<?, ?, ?> transform =
- aggregatorTransforms.getAppliedTransformForStepName(
- context.get(STEP_NAME_CONTEXT_KEY));
- String fullName = transform.getFullName();
- // Prefer the tentative (fresher) value if it exists.
- if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
- results.put(fullName, toValue(aggregator, metricUpdate));
- }
- }
- }
-
- return results;
-
- }
-
- private static <OutputT> OutputT toValue(
- Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
- CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
- Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
- if (outputType.equals(Long.class)) {
- @SuppressWarnings("unchecked")
- OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
- return asLong;
- }
- if (outputType.equals(Integer.class)) {
- @SuppressWarnings("unchecked")
- OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
- return asInt;
- }
- if (outputType.equals(Double.class)) {
- @SuppressWarnings("unchecked")
- OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
- return asDouble;
- }
- throw new UnsupportedOperationException(
- "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
- }
-
- private static Number toNumber(MetricUpdate update) {
- if (update.getScalar() instanceof Number) {
- return (Number) update.getScalar();
- }
- throw new IllegalArgumentException(
- "Metric Update " + update + " does not have a numeric scalar");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index d4d29dd..7633a56 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -79,9 +79,9 @@ class DataflowMetrics extends MetricResults {
private MetricKey metricHashKey(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
String fullStepName = metricUpdate.getName().getContext().get("step");
- fullStepName = (dataflowPipelineJob.aggregatorTransforms != null
- ? dataflowPipelineJob.aggregatorTransforms
- .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName);
+ fullStepName = (dataflowPipelineJob.transformStepNames != null
+ ? dataflowPipelineJob.transformStepNames
+ .inverse().get(fullStepName).getFullName() : fullStepName);
return MetricKey.create(
fullStepName,
MetricName.named(
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 0399ada..d464206 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -26,9 +27,11 @@ import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
@@ -42,7 +45,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -89,7 +92,7 @@ public class DataflowPipelineJob implements PipelineResult {
@Nullable
private DataflowPipelineJob replacedByJob = null;
- protected DataflowAggregatorTransforms aggregatorTransforms;
+ protected BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames;
/**
* The Metric Updates retrieved after the job was in a terminal state.
@@ -126,16 +129,17 @@ public class DataflowPipelineJob implements PipelineResult {
*
* @param jobId the job id
* @param dataflowOptions used to configure the client for the Dataflow Service
- * @param aggregatorTransforms a mapping from aggregators to PTransforms
+ * @param transformStepNames a mapping from AppliedPTransforms to Step Names
*/
public DataflowPipelineJob(
String jobId,
DataflowPipelineOptions dataflowOptions,
- DataflowAggregatorTransforms aggregatorTransforms) {
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
- this.aggregatorTransforms = aggregatorTransforms;
+ this.transformStepNames = HashBiMap.create(
+ firstNonNull(transformStepNames, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()));
this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
}
@@ -456,7 +460,7 @@ public class DataflowPipelineJob implements PipelineResult {
if (currentState.isTerminal()) {
terminalState = currentState;
replacedByJob = new DataflowPipelineJob(
- job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+ job.getReplacedByJobId(), dataflowOptions, transformStepNames);
}
return job;
} catch (IOException exn) {
@@ -488,27 +492,4 @@ public class DataflowPipelineJob implements PipelineResult {
public MetricResults metrics() {
return dataflowMetrics;
}
-
- private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
- throws IOException {
- if (aggregatorTransforms.contains(aggregator)) {
- List<MetricUpdate> metricUpdates;
- if (terminalMetricUpdates != null) {
- metricUpdates = terminalMetricUpdates;
- } else {
- boolean terminal = getState().isTerminal();
- JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId);
- metricUpdates = jobMetrics.getMetrics();
- if (terminal && jobMetrics.getMetrics() != null) {
- terminalMetricUpdates = metricUpdates;
- }
- }
-
- return DataflowMetricUpdateExtractor.fromMetricUpdates(
- aggregator, aggregatorTransforms, metricUpdates);
- } else {
- throw new IllegalArgumentException(
- "Aggregator " + aggregator + " is not used in this pipeline");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index b37d34a..63c2191 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -49,7 +49,6 @@ import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -97,7 +96,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
@@ -620,18 +618,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
throw new RuntimeException("Failed to create a workflow job", e);
}
- // Obtain all of the extractors from the PTransforms used in the pipeline so the
- // DataflowPipelineJob has access to them.
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
- pipeline.getAggregatorSteps();
-
- DataflowAggregatorTransforms aggregatorTransforms =
- new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());
-
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
DataflowPipelineJob dataflowPipelineJob =
- new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
+ new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames());
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 59315a7..9dd2ab1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -35,6 +35,7 @@ import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
import com.google.api.services.dataflow.model.Job;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Collections;
@@ -154,11 +155,10 @@ public class DataflowPipelineJobTest {
when(mockMessages.list(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(listRequest);
when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
State state = job.waitUntilFinish(
Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -177,11 +177,10 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenReturn(statusResponse);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
}
@@ -245,11 +244,10 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenThrow(IOException.class);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -266,11 +264,10 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenThrow(IOException.class);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
assertEquals(null, state);
@@ -289,13 +286,11 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenReturn(statusResponse);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
-
FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
- DataflowPipelineJob job = new DataflowPipelineJob(
- JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = clock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
assertEquals(null, state);
@@ -315,11 +310,9 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenReturn(statusResponse);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
-
- DataflowPipelineJob job = new DataflowPipelineJob(
- JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
assertEquals(
State.RUNNING,
@@ -333,11 +326,10 @@ public class DataflowPipelineJobTest {
when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
when(statusRequest.execute()).thenThrow(IOException.class);
- DataflowAggregatorTransforms dataflowAggregatorTransforms =
- mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options,
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
assertEquals(