You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/25 21:03:19 UTC

[beam] branch master updated: Merge pull request #8095 from Ardagan/ElementCountJava

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 956a244  Merge pull request #8095 from Ardagan/ElementCountJava
956a244 is described below

commit 956a2449a7b7a71705d4ff2afbd4d3a05e9193cb
Author: Mikhail Gryzykhin <12...@users.noreply.github.com>
AuthorDate: Mon Mar 25 14:03:07 2019 -0700

    Merge pull request #8095 from Ardagan/ElementCountJava
    
    [BEAM-6824] Add Element count transformer for Java worker FnApi (#8095)
---
 .../dataflow/worker/BatchDataflowWorker.java       |   1 +
 .../worker/BeamFnMapTaskExecutorFactory.java       |  14 +++
 .../worker/fn/control/BeamFnMapTaskExecutor.java   |   3 +-
 ...ntMonitoringInfoToCounterUpdateTransformer.java | 113 ++++++++++++++++++++
 ...piMonitoringInfoToCounterUpdateTransformer.java |   8 +-
 .../control/RegisterAndProcessBundleOperation.java |   8 ++
 .../beam/runners/dataflow/worker/graph/Nodes.java  |   8 +-
 .../worker/graph/RegisterNodeFunction.java         |  16 ++-
 .../fn/control/BeamFnMapTaskExecutorTest.java      |   4 +
 ...nitoringInfoToCounterUpdateTransformerTest.java | 116 +++++++++++++++++++++
 .../RegisterAndProcessBundleOperationTest.java     |  11 ++
 .../runners/dataflow/worker/graph/NodesTest.java   |  18 ++--
 12 files changed, 309 insertions(+), 11 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 0c22f09..42b6c45 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -348,6 +348,7 @@ public class BatchDataflowWorker implements Closeable {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Network as Graphviz .dot: {}", Networks.toDot(network));
         }
+
         worker =
             mapTaskExecutorFactory.create(
                 sdkWorkerHarness.getControlClientHandler(),
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index f54c207..10e833f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -427,6 +427,19 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
               entry.getKey(), executionContext.getStepContext(operationContext));
         }
 
+        ImmutableMap.Builder<String, NameContext> pcollectionIdToNameContext =
+            ImmutableMap.builder();
+        for (Map.Entry<String, NameContext> entry :
+            input.getPCollectionToPartialNameContextMap().entrySet()) {
+          pcollectionIdToNameContext.put(
+              entry.getKey(),
+              NameContext.create(
+                  stageName,
+                  entry.getValue().originalName(),
+                  entry.getValue().systemName(),
+                  entry.getValue().userName()));
+        }
+
         ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts =
             ptransformIdToOperationContextBuilder.build();
 
@@ -448,6 +461,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
                 ptransformIdToStepContext.build(),
                 ptransformIdToSideInputReaders,
                 ptransformIdToSideInputIdToPCollectionView,
+                pcollectionIdToNameContext.build(),
                 // TODO: Set NameContext properly for these operations.
                 executionContext.createOperationContext(
                     NameContext.create(stageName, stageName, stageName, stageName))));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index 1baf1d2..f88da99 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -402,7 +402,8 @@ public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
     private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
       final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer =
           new FnApiMonitoringInfoToCounterUpdateTransformer(
-              bundleProcessOperation.getPtransformIdToUserStepContext());
+              this.bundleProcessOperation.getPtransformIdToUserStepContext(),
+              this.bundleProcessOperation.getPCollectionIdToNameContext());
 
       counterUpdates =
           monitoringInfos.stream()
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
new file mode 100644
index 0000000..f4c8565
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import com.google.api.services.dataflow.model.NameAndKind;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** MonitoringInfo to CounterUpdate transformer capable to transform ElementCount counters. */
+public class ElementCountMonitoringInfoToCounterUpdateTransformer
+    implements MonitoringInfoToCounterUpdateTransformer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class);
+
+  private final SpecMonitoringInfoValidator specValidator;
+  private final Map<String, NameContext> pcollectionIdToNameContext;
+  private static final String SUPPORTED_URN = "beam:metric:element_count:v1";
+
+  /**
+   * @param specValidator SpecMonitoringInfoValidator to utilize for default validation.
+   * @param pcollectionIdToNameContext This mapping is utilized to generate DFE CounterUpdate name.
+   */
+  public ElementCountMonitoringInfoToCounterUpdateTransformer(
+      SpecMonitoringInfoValidator specValidator,
+      Map<String, NameContext> pcollectionIdToNameContext) {
+    this.specValidator = specValidator;
+    this.pcollectionIdToNameContext = pcollectionIdToNameContext;
+  }
+
+  /**
+   * Validates provided monitoring info against specs and common safety checks.
+   *
+   * @param monitoringInfo to validate.
+   * @return Optional.empty() all validation checks are passed. Optional with error text otherwise.
+   * @throws RuntimeException if received unexpected urn.
+   */
+  protected Optional<String> validate(MonitoringInfo monitoringInfo) {
+    Optional<String> validatorResult = specValidator.validate(monitoringInfo);
+    if (validatorResult.isPresent()) {
+      return validatorResult;
+    }
+
+    String urn = monitoringInfo.getUrn();
+    if (!urn.equals(SUPPORTED_URN)) {
+      throw new RuntimeException(String.format("Received unexpected counter urn: %s", urn));
+    }
+
+    // todo(migryz): extract and utilize pcollection label from beam_fn_api.proto
+    if (!pcollectionIdToNameContext.containsKey(monitoringInfo.getLabelsMap().get("PCOLLECTION"))) {
+      return Optional.of(
+          "Encountered ElementCount MonitoringInfo with unknown PCollectionId: "
+              + monitoringInfo.toString());
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Generates CounterUpdate to send to DFE based on ElementCount MonitoringInfo.
+   *
+   * @param monitoringInfo Monitoring info to transform.
+   * @return CounterUpdate generated based on provided monitoringInfo
+   */
+  @Override
+  public CounterUpdate transform(MonitoringInfo monitoringInfo) {
+    Optional<String> validationResult = validate(monitoringInfo);
+    if (validationResult.isPresent()) {
+      LOG.info(validationResult.get());
+      return null;
+    }
+
+    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+
+    final String pcollectionId = monitoringInfo.getLabelsMap().get("PCOLLECTION");
+    final String pcollectionName = pcollectionIdToNameContext.get(pcollectionId).userName();
+
+    String counterName = pcollectionName + "-ElementCount";
+    NameAndKind name = new NameAndKind();
+    name.setName(counterName).setKind("SUM");
+
+    return new CounterUpdate()
+        .setNameAndKind(name)
+        .setCumulative(true)
+        .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+  }
+
+  /** @return iterable of Urns that this transformer can convert to CounterUpdates. */
+  public static String getSupportedUrn() {
+    return SUPPORTED_URN;
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 8990e3c..b8f8a3b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -37,7 +38,8 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
   final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
 
   public FnApiMonitoringInfoToCounterUpdateTransformer(
-      Map<String, DataflowStepContext> stepContextMap) {
+      Map<String, DataflowStepContext> stepContextMap,
+      Map<String, NameContext> sdkPCollectionIdToNameContext) {
     SpecMonitoringInfoValidator specValidator = new SpecMonitoringInfoValidator();
     this.userCounterTransformer =
         new UserMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
@@ -47,6 +49,10 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
     for (String urn : msecTransformer.getSupportedUrns()) {
       this.counterTransformers.put(urn, msecTransformer);
     }
+    this.counterTransformers.put(
+        ElementCountMonitoringInfoToCounterUpdateTransformer.getSupportedUrn(),
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            specValidator, sdkPCollectionIdToNameContext));
   }
 
   /** Allows for injection of user and generic counter transformers for more convenient testing. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 6518555..7799e6f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -51,6 +51,7 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.dataflow.worker.ByteStringCoder;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
@@ -100,6 +101,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
   private final Table<String, String, PCollectionView<?>>
       ptransformIdToSideInputIdToPCollectionView;
   private final ConcurrentHashMap<StateKey, BagState<ByteString>> userStateData;
+  private final Map<String, NameContext> pcollectionIdToNameContext;
 
   private @Nullable CompletionStage<InstructionResponse> registerFuture;
   private @Nullable CompletionStage<InstructionResponse> processBundleResponse;
@@ -118,6 +120,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
       Map<String, DataflowStepContext> ptransformIdToSystemStepContext,
       Map<String, SideInputReader> ptransformIdToSideInputReader,
       Table<String, String, PCollectionView<?>> ptransformIdToSideInputIdToPCollectionView,
+      Map<String, NameContext> pcollectionIdToNameContext,
       OperationContext context) {
     super(EMPTY_RECEIVERS, context);
     this.idGenerator = idGenerator;
@@ -126,6 +129,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
     this.registerRequest = registerRequest;
     this.ptransformIdToSideInputReader = ptransformIdToSideInputReader;
     this.ptransformIdToSideInputIdToPCollectionView = ptransformIdToSideInputIdToPCollectionView;
+    this.pcollectionIdToNameContext = pcollectionIdToNameContext;
     ImmutableMap.Builder<String, DataflowStepContext> userStepContextsMap = ImmutableMap.builder();
     for (Map.Entry<String, DataflowStepContext> entry :
         ptransformIdToSystemStepContext.entrySet()) {
@@ -301,6 +305,10 @@ public class RegisterAndProcessBundleOperation extends Operation {
     }
   }
 
+  public Map<String, NameContext> getPCollectionIdToNameContext() {
+    return this.pcollectionIdToNameContext;
+  }
+
   public Map<String, DataflowStepContext> getPtransformIdToUserStepContext() {
     return ptransformIdToUserStepContext;
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 7a2d873..139b274 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -289,14 +289,16 @@ public class Nodes {
         BeamFnApi.RegisterRequest request,
         Map<String, NameContext> ptransformIdToPartialNameContextMap,
         Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
-        Map<String, Iterable<PCollectionView<?>>> pTransformIdToPCollectionViewMap) {
+        Map<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViewMap,
+        Map<String, NameContext> pcollectionToPartialNameContextMap) {
       checkNotNull(request);
       checkNotNull(ptransformIdToPartialNameContextMap);
       return new AutoValue_Nodes_RegisterRequestNode(
           request,
           ptransformIdToPartialNameContextMap,
           ptransformIdToSideInputInfoMap,
-          pTransformIdToPCollectionViewMap);
+          ptransformIdToPCollectionViewMap,
+          pcollectionToPartialNameContextMap);
     }
 
     public abstract BeamFnApi.RegisterRequest getRegisterRequest();
@@ -307,6 +309,8 @@ public class Nodes {
 
     public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();
 
+    public abstract Map<String, NameContext> getPCollectionToPartialNameContextMap();
+
     @Override
     public String toString() {
       // The request may be very large.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index 6dea449..c5e7d13 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -219,7 +219,11 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
         ImmutableMap.builder();
     ImmutableMap.Builder<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews =
         ImmutableMap.builder();
+    ImmutableMap.Builder<String, NameContext> pcollectionIdToNameContexts = ImmutableMap.builder();
 
+    // For each instruction output node:
+    // 1. Generate new Coder and register it with SDKComponents and ProcessBundleDescriptor.
+    // 2. Generate new PCollectionId and register it with ProcessBundleDescriptor.
     for (InstructionOutputNode node :
         Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
       InstructionOutput instructionOutput = node.getInstructionOutput();
@@ -259,6 +263,8 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
             e);
       }
 
+      // Generate new PCollection ID and map it to relevant node.
+      // Will later be used to fill PTransform inputs/outputs information.
       String pcollectionId = "generatedPcollection" + idGenerator.getId();
       processBundleDescriptor.putPcollections(
           pcollectionId,
@@ -267,6 +273,13 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
               .setWindowingStrategyId(fakeWindowingStrategyId)
               .build());
       nodesToPCollections.put(node, pcollectionId);
+      pcollectionIdToNameContexts.put(
+          pcollectionId,
+          NameContext.create(
+              null,
+              instructionOutput.getOriginalName(),
+              instructionOutput.getSystemName(),
+              instructionOutput.getName()));
     }
     processBundleDescriptor.putAllCoders(sdkComponents.toComponents().getCodersMap());
 
@@ -430,7 +443,8 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
         RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build(),
         ptransformIdToNameContexts.build(),
         ptransformIdToSideInputInfos.build(),
-        ptransformIdToPCollectionViews.build());
+        ptransformIdToPCollectionViews.build(),
+        pcollectionIdToNameContexts.build());
   }
 
   /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 02ac1ba..38e3d7a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -184,6 +184,7 @@ public class BeamFnMapTaskExecutorTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     BeamFnMapTaskExecutor mapTaskExecutor =
@@ -287,6 +288,7 @@ public class BeamFnMapTaskExecutorTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     BeamFnMapTaskExecutor mapTaskExecutor =
@@ -396,6 +398,7 @@ public class BeamFnMapTaskExecutorTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     BeamFnMapTaskExecutor mapTaskExecutor =
@@ -519,6 +522,7 @@ public class BeamFnMapTaskExecutorTest {
             stepContextMap,
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     BeamFnMapTaskExecutor mapTaskExecutor =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java
new file mode 100644
index 0000000..a45463b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class ElementCountMonitoringInfoToCounterUpdateTransformerTest {
+
+  @Rule public final ExpectedException exception = ExpectedException.none();
+
+  @Mock private SpecMonitoringInfoValidator mockSpecValidator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void tesTransformReturnsNullIfSpecValidationFails() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    Optional<String> error = Optional.of("Error text");
+    when(mockSpecValidator.validate(any())).thenReturn(error);
+    assertEquals(null, testObject.transform(null));
+  }
+
+  @Test
+  public void testTransformThrowsIfMonitoringInfoWithWrongUrnPrefixReceived() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder().setUrn("beam:user:metric:element_count:v1").build();
+    ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+    exception.expect(RuntimeException.class);
+    testObject.transform(monitoringInfo);
+  }
+
+  @Test
+  public void testTransformReturnsNullIfMonitoringInfoWithUnknownPCollectionLabelPresent() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder()
+            .setUrn("beam:metric:element_count:v1")
+            .putLabels("PCOLLECTION", "anyValue")
+            .build();
+    ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+    assertEquals(null, testObject.transform(monitoringInfo));
+  }
+
+  @Test
+  public void testTransformReturnsValidCounterUpdateWhenValidMonitoringInfoReceived() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    pcollectionNameMapping.put(
+        "anyValue",
+        NameContext.create("anyStageName", "anyOriginName", "anySystemName", "transformedValue"));
+
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder()
+            .setUrn("beam:metric:element_count:v1")
+            .putLabels("PCOLLECTION", "anyValue")
+            .build();
+    ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+    CounterUpdate result = testObject.transform(monitoringInfo);
+    assertNotEquals(null, result);
+
+    assertEquals(
+        "{cumulative=true, integer={highBits=0, lowBits=0}, "
+            + "nameAndKind={kind=SUM, "
+            + "name=transformedValue-ElementCount}}",
+        result.toString());
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 16d4d11..71c36df 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -174,6 +174,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext)
         .supportsRestart();
   }
@@ -215,6 +216,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     // Ensure that the first time we start we send the register and process bundle requests
@@ -320,6 +322,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     operation.start();
@@ -423,6 +426,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     operation.start();
@@ -493,6 +497,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     operation.start();
@@ -613,6 +618,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of("testPTransformId", mockStepContext),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     operation.start();
@@ -740,6 +746,7 @@ public class RegisterAndProcessBundleOperationTest {
                     FullWindowedValueCoder.of(
                         KvCoder.of(ByteArrayCoder.of(), StringUtf8Coder.of()),
                         GlobalWindow.Coder.INSTANCE))),
+            ImmutableMap.of(),
             mockContext);
 
     operation.start();
@@ -791,6 +798,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
     abortReference.set(operation::abort);
     operation.start();
@@ -838,6 +846,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
     abortReference.set(operation::abort);
     operation.start();
@@ -875,6 +884,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     assertEquals(
@@ -897,6 +907,7 @@ public class RegisterAndProcessBundleOperationTest {
             ImmutableMap.of(),
             ImmutableMap.of(),
             ImmutableTable.of(),
+            ImmutableMap.of(),
             mockContext);
 
     operation.getProcessBundleInstructionId(); // this generates and caches bundleId
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
index c5a83f9..5a71d6d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
@@ -118,23 +118,29 @@ public class NodesTest {
                         GlobalWindow.Coder.INSTANCE))));
     assertSame(
         param,
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+        RegisterRequestNode.create(
+                param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
             .getRegisterRequest());
     assertSame(
         nameContexts,
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+        RegisterRequestNode.create(
+                param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
             .getPTransformIdToPartialNameContextMap());
     assertSame(
         sideInputInfos,
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+        RegisterRequestNode.create(
+                param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
             .getPTransformIdToSideInputInfoMap());
     assertSame(
         pcollectionViews,
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+        RegisterRequestNode.create(
+                param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
             .getPTransformIdToPCollectionViewMap());
     assertNotEquals(
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews),
-        RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews));
+        RegisterRequestNode.create(
+            param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of()),
+        RegisterRequestNode.create(
+            param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of()));
   }
 
   @Test