You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 18:03:06 UTC

[beam] branch master updated: FnApiMonitoringInfoToCounterUpdateTranformer with User counters

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c2e983  FnApiMonitoringInfoToCounterUpdateTranformer with User counters
     new 0a98fce  Merge pull request #8456 from Ardagan/FixUserCounters
5c2e983 is described below

commit 5c2e9839a47921d30c39e7e32126a099fd103c58
Author: Mikhail Gryzykhin <mi...@google.com>
AuthorDate: Wed May 1 11:08:57 2019 -0700

    FnApiMonitoringInfoToCounterUpdateTranformer with User counters
---
 ...piMonitoringInfoToCounterUpdateTransformer.java | 21 ++++------
 ...nitoringInfoToCounterUpdateTransformerTest.java | 49 ++++++++--------------
 2 files changed, 25 insertions(+), 45 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 49058b6..b8f3bf2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -35,19 +35,22 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
 public class FnApiMonitoringInfoToCounterUpdateTransformer
     implements MonitoringInfoToCounterUpdateTransformer {
 
-  final UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer;
-  final UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer;
   final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
 
   public FnApiMonitoringInfoToCounterUpdateTransformer(
       Map<String, DataflowStepContext> stepContextMap,
       Map<String, NameContext> sdkPCollectionIdToNameContext) {
     SpecMonitoringInfoValidator specValidator = new SpecMonitoringInfoValidator();
-    this.userCounterTransformer =
+
+    UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer =
         new UserMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
+    counterTransformers.put(userCounterTransformer.getSupportedUrnPrefix(), userCounterTransformer);
 
-    this.userDistributionCounterTransformer =
+    UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer =
         new UserDistributionMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
+    counterTransformers.put(
+        userDistributionCounterTransformer.getSupportedUrnPrefix(),
+        userDistributionCounterTransformer);
 
     MSecMonitoringInfoToCounterUpdateTransformer msecTransformer =
         new MSecMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
@@ -67,11 +70,7 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
   /** Allows for injection of user and generic counter transformers for more convenient testing. */
   @VisibleForTesting
   public FnApiMonitoringInfoToCounterUpdateTransformer(
-      UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer,
-      UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer,
       Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers) {
-    this.userCounterTransformer = userCounterTransformer;
-    this.userDistributionCounterTransformer = userDistributionCounterTransformer;
     this.counterTransformers.putAll(counterTransformers);
   }
 
@@ -79,12 +78,6 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
   @Nullable
   public CounterUpdate transform(MonitoringInfo src) {
     String urn = src.getUrn();
-    if (urn.startsWith(userCounterTransformer.getSupportedUrnPrefix())) {
-      return userCounterTransformer.transform(src);
-    } else if (urn.startsWith(userDistributionCounterTransformer.getSupportedUrnPrefix())) {
-      return this.userDistributionCounterTransformer.transform(src);
-    }
-
     MonitoringInfoToCounterUpdateTransformer transformer = counterTransformers.get(urn);
     return transformer == null ? null : transformer.transform(src);
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
index 1a0f3b1..3dec6e1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -34,13 +33,8 @@ import org.mockito.MockitoAnnotations;
 
 public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
 
-  @Mock private UserMonitoringInfoToCounterUpdateTransformer mockUserCounterTransformer;
-
-  @Mock
-  private UserDistributionMonitoringInfoToCounterUpdateTransformer
-      mockUserDistributionCounterTransformer;
-
-  @Mock private UserMonitoringInfoToCounterUpdateTransformer mockGenericTransformer1;
+  @Mock private UserMonitoringInfoToCounterUpdateTransformer mockTransformer2;
+  @Mock private UserMonitoringInfoToCounterUpdateTransformer mockTransformer1;
 
   @Before
   public void setUp() {
@@ -48,22 +42,22 @@ public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
   }
 
   @Test
-  public void testTransformUtilizesUserCounterTransformerForUserCounters() {
-    Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers =
-        Collections.EMPTY_MAP;
+  public void testTransformUtilizesRelevantCounterTransformer() {
+    Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers = new HashMap<>();
+
+    final String validUrn = "urn1";
+    genericTransformers.put(validUrn, mockTransformer1);
+    genericTransformers.put("any:other:urn", mockTransformer2);
+
     FnApiMonitoringInfoToCounterUpdateTransformer testObject =
-        new FnApiMonitoringInfoToCounterUpdateTransformer(
-            mockUserCounterTransformer,
-            mockUserDistributionCounterTransformer,
-            genericTransformers);
+        new FnApiMonitoringInfoToCounterUpdateTransformer(genericTransformers);
 
     CounterUpdate expectedResult = new CounterUpdate();
-    when(mockUserCounterTransformer.transform(any())).thenReturn(expectedResult);
-    when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("user:prefix:");
+    when(mockTransformer1.transform(any())).thenReturn(expectedResult);
 
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("user:prefix:anyNamespace:anyName")
+            .setUrn(validUrn)
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
 
@@ -73,32 +67,25 @@ public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
   }
 
   @Test
-  public void testTransformUtilizesRelevantCounterTransformerForNonUserCounters() {
+  public void testTransformReturnsNullOnUnknownUrn() {
     Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers = new HashMap<>();
-    final String validUrn = "urn1";
-    genericTransformers.put(validUrn, mockGenericTransformer1);
 
-    when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("invalid:prefix:");
-    when(mockUserDistributionCounterTransformer.getSupportedUrnPrefix())
-        .thenReturn("invalid:prefix2:");
+    genericTransformers.put("beam:metric:user", mockTransformer2);
 
     FnApiMonitoringInfoToCounterUpdateTransformer testObject =
-        new FnApiMonitoringInfoToCounterUpdateTransformer(
-            mockUserCounterTransformer,
-            mockUserDistributionCounterTransformer,
-            genericTransformers);
+        new FnApiMonitoringInfoToCounterUpdateTransformer(genericTransformers);
 
     CounterUpdate expectedResult = new CounterUpdate();
-    when(mockGenericTransformer1.transform(any())).thenReturn(expectedResult);
+    when(mockTransformer1.transform(any())).thenReturn(expectedResult);
 
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn(validUrn)
+            .setUrn("any:other:urn")
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
 
     CounterUpdate result = testObject.transform(monitoringInfo);
 
-    assertSame(expectedResult, result);
+    assertSame(null, result);
   }
 }