You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/26 22:07:59 UTC

[1/2] beam git commit: Closes #2613

Repository: beam
Updated Branches:
  refs/heads/master af8ead44e -> 7339882b0


Closes #2613


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7339882b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7339882b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7339882b

Branch: refs/heads/master
Commit: 7339882b0fa756617480a9a50788ccfb7d7b7cb1
Parents: af8ead4 7c17420
Author: bchambers <bc...@google.com>
Authored: Wed Apr 26 10:04:49 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Apr 26 14:41:52 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowAggregatorTransforms.java  |  79 --------------
 .../dataflow/DataflowMetricUpdateExtractor.java | 109 -------------------
 .../beam/runners/dataflow/DataflowMetrics.java  |   6 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  41 ++-----
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../dataflow/DataflowPipelineJobTest.java       |  42 +++----
 6 files changed, 32 insertions(+), 257 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Remove aggregators from Dataflow runner

Posted by bc...@apache.org.
Remove aggregators from Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c17420d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c17420d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c17420d

Branch: refs/heads/master
Commit: 7c17420d9d2714bf7929a7cb3dee09787e7359a6
Parents: af8ead4
Author: Pablo <pa...@google.com>
Authored: Thu Apr 20 11:08:10 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Apr 26 14:41:52 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowAggregatorTransforms.java  |  79 --------------
 .../dataflow/DataflowMetricUpdateExtractor.java | 109 -------------------
 .../beam/runners/dataflow/DataflowMetrics.java  |   6 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  41 ++-----
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../dataflow/DataflowPipelineJobTest.java       |  42 +++----
 6 files changed, 32 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
deleted file mode 100755
index 0198cca..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-class DataflowAggregatorTransforms {
-  private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
-  private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
-  private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
-  public DataflowAggregatorTransforms(
-      Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
-      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
-    this.aggregatorTransforms = aggregatorTransforms;
-    appliedStepNames = HashBiMap.create(transformStepNames);
-
-    transformAppliedTransforms = HashMultimap.create();
-    for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
-      transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
-    }
-  }
-
-  /**
-   * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
-   */
-  public boolean contains(Aggregator<?, ?> aggregator) {
-    return aggregatorTransforms.containsKey(aggregator);
-  }
-
-  /**
-   * Gets the step names in which the {@link Aggregator} is used.
-   */
-  public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
-    Collection<String> names = new HashSet<>();
-    Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
-    for (PTransform<?, ?> transform : transforms) {
-      for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
-        names.add(appliedStepNames.get(applied));
-      }
-    }
-    return names;
-  }
-
-  /**
-   * Gets the {@link PTransform} that was assigned the provided step name.
-   */
-  public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
-    return appliedStepNames.inverse().get(stepName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
deleted file mode 100755
index f725c46..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-final class DataflowMetricUpdateExtractor {
-  private static final String STEP_NAME_CONTEXT_KEY = "step";
-  private static final String IS_TENTATIVE_KEY = "tentative";
-
-  private DataflowMetricUpdateExtractor() {
-    // Do not instantiate.
-  }
-
-  /**
-   * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
-   * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
-   * MetricUpdate MetricUpdates}.
-   */
-  public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
-      DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
-    Map<String, OutputT> results = new HashMap<>();
-    if (metricUpdates == null) {
-      return results;
-    }
-
-    String aggregatorName = aggregator.getName();
-    Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
-    for (MetricUpdate metricUpdate : metricUpdates) {
-      MetricStructuredName metricStructuredName = metricUpdate.getName();
-      Map<String, String> context = metricStructuredName.getContext();
-      if (metricStructuredName.getName().equals(aggregatorName) && context != null
-          && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
-        AppliedPTransform<?, ?, ?> transform =
-            aggregatorTransforms.getAppliedTransformForStepName(
-                context.get(STEP_NAME_CONTEXT_KEY));
-        String fullName = transform.getFullName();
-        // Prefer the tentative (fresher) value if it exists.
-        if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
-          results.put(fullName, toValue(aggregator, metricUpdate));
-        }
-      }
-    }
-
-    return results;
-
-  }
-
-  private static <OutputT> OutputT toValue(
-      Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
-    CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
-    Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
-    if (outputType.equals(Long.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
-      return asLong;
-    }
-    if (outputType.equals(Integer.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
-      return asInt;
-    }
-    if (outputType.equals(Double.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
-      return asDouble;
-    }
-    throw new UnsupportedOperationException(
-        "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
-  }
-
-  private static Number toNumber(MetricUpdate update) {
-    if (update.getScalar() instanceof Number) {
-      return (Number) update.getScalar();
-    }
-    throw new IllegalArgumentException(
-        "Metric Update " + update + " does not have a numeric scalar");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index d4d29dd..7633a56 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -79,9 +79,9 @@ class DataflowMetrics extends MetricResults {
   private MetricKey metricHashKey(
       com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
     String fullStepName = metricUpdate.getName().getContext().get("step");
-    fullStepName = (dataflowPipelineJob.aggregatorTransforms != null
-        ? dataflowPipelineJob.aggregatorTransforms
-            .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName);
+    fullStepName = (dataflowPipelineJob.transformStepNames != null
+        ? dataflowPipelineJob.transformStepNames
+        .inverse().get(fullStepName).getFullName() : fullStepName);
     return MetricKey.create(
         fullStepName,
         MetricName.named(

http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 0399ada..d464206 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -26,9 +27,11 @@ import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.List;
@@ -42,7 +45,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -89,7 +92,7 @@ public class DataflowPipelineJob implements PipelineResult {
   @Nullable
   private DataflowPipelineJob replacedByJob = null;
 
-  protected DataflowAggregatorTransforms aggregatorTransforms;
+  protected BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames;
 
   /**
    * The Metric Updates retrieved after the job was in a terminal state.
@@ -126,16 +129,17 @@ public class DataflowPipelineJob implements PipelineResult {
    *
    * @param jobId the job id
    * @param dataflowOptions used to configure the client for the Dataflow Service
-   * @param aggregatorTransforms a mapping from aggregators to PTransforms
+   * @param transformStepNames a mapping from AppliedPTransforms to Step Names
    */
   public DataflowPipelineJob(
       String jobId,
       DataflowPipelineOptions dataflowOptions,
-      DataflowAggregatorTransforms aggregatorTransforms) {
+      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
     this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
-    this.aggregatorTransforms = aggregatorTransforms;
+    this.transformStepNames = HashBiMap.create(
+        firstNonNull(transformStepNames, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()));
     this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
   }
 
@@ -456,7 +460,7 @@ public class DataflowPipelineJob implements PipelineResult {
         if (currentState.isTerminal()) {
           terminalState = currentState;
           replacedByJob = new DataflowPipelineJob(
-              job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+              job.getReplacedByJobId(), dataflowOptions, transformStepNames);
         }
         return job;
       } catch (IOException exn) {
@@ -488,27 +492,4 @@ public class DataflowPipelineJob implements PipelineResult {
   public MetricResults metrics() {
     return dataflowMetrics;
   }
-
-  private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
-      throws IOException {
-    if (aggregatorTransforms.contains(aggregator)) {
-      List<MetricUpdate> metricUpdates;
-      if (terminalMetricUpdates != null) {
-        metricUpdates = terminalMetricUpdates;
-      } else {
-        boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId);
-        metricUpdates = jobMetrics.getMetrics();
-        if (terminal && jobMetrics.getMetrics() != null) {
-          terminalMetricUpdates = metricUpdates;
-        }
-      }
-
-      return DataflowMetricUpdateExtractor.fromMetricUpdates(
-          aggregator, aggregatorTransforms, metricUpdates);
-    } else {
-      throw new IllegalArgumentException(
-          "Aggregator " + aggregator + " is not used in this pipeline");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index b37d34a..63c2191 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -49,7 +49,6 @@ import java.net.URLClassLoader;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -97,7 +96,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
@@ -620,18 +618,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       throw new RuntimeException("Failed to create a workflow job", e);
     }
 
-    // Obtain all of the extractors from the PTransforms used in the pipeline so the
-    // DataflowPipelineJob has access to them.
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        pipeline.getAggregatorSteps();
-
-    DataflowAggregatorTransforms aggregatorTransforms =
-        new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());
-
     // Use a raw client for post-launch monitoring, as status calls may fail
     // regularly and need not be retried automatically.
     DataflowPipelineJob dataflowPipelineJob =
-        new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
+        new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames());
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/beam/blob/7c17420d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 59315a7..9dd2ab1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -35,6 +35,7 @@ import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
 import com.google.api.services.dataflow.model.Job;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Collections;
@@ -154,11 +155,10 @@ public class DataflowPipelineJobTest {
     when(mockMessages.list(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(listRequest);
     when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
     when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     State state = job.waitUntilFinish(
         Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -177,11 +177,10 @@ public class DataflowPipelineJobTest {
 
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
   }
@@ -245,11 +244,10 @@ public class DataflowPipelineJobTest {
 
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -266,11 +264,10 @@ public class DataflowPipelineJobTest {
 
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
     assertEquals(null, state);
@@ -289,13 +286,11 @@ public class DataflowPipelineJobTest {
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
-
     FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
     long startTime = clock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
     assertEquals(null, state);
@@ -315,11 +310,9 @@ public class DataflowPipelineJobTest {
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
-
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     assertEquals(
         State.RUNNING,
@@ -333,11 +326,10 @@ public class DataflowPipelineJobTest {
 
     when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
-    DataflowAggregatorTransforms dataflowAggregatorTransforms =
-        mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options,
+            ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     long startTime = fastClock.nanoTime();
     assertEquals(