You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 18:03:06 UTC
[beam] branch master updated:
FnApiMonitoringInfoToCounterUpdateTranformer with User counters
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5c2e983 FnApiMonitoringInfoToCounterUpdateTranformer with User counters
new 0a98fce Merge pull request #8456 from Ardagan/FixUserCounters
5c2e983 is described below
commit 5c2e9839a47921d30c39e7e32126a099fd103c58
Author: Mikhail Gryzykhin <mi...@google.com>
AuthorDate: Wed May 1 11:08:57 2019 -0700
FnApiMonitoringInfoToCounterUpdateTranformer with User counters
---
...piMonitoringInfoToCounterUpdateTransformer.java | 21 ++++------
...nitoringInfoToCounterUpdateTransformerTest.java | 49 ++++++++--------------
2 files changed, 25 insertions(+), 45 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 49058b6..b8f3bf2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -35,19 +35,22 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
public class FnApiMonitoringInfoToCounterUpdateTransformer
implements MonitoringInfoToCounterUpdateTransformer {
- final UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer;
- final UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer;
final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
public FnApiMonitoringInfoToCounterUpdateTransformer(
Map<String, DataflowStepContext> stepContextMap,
Map<String, NameContext> sdkPCollectionIdToNameContext) {
SpecMonitoringInfoValidator specValidator = new SpecMonitoringInfoValidator();
- this.userCounterTransformer =
+
+ UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer =
new UserMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
+ counterTransformers.put(userCounterTransformer.getSupportedUrnPrefix(), userCounterTransformer);
- this.userDistributionCounterTransformer =
+ UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer =
new UserDistributionMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
+ counterTransformers.put(
+ userDistributionCounterTransformer.getSupportedUrnPrefix(),
+ userDistributionCounterTransformer);
MSecMonitoringInfoToCounterUpdateTransformer msecTransformer =
new MSecMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
@@ -67,11 +70,7 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
/** Allows for injection of user and generic counter transformers for more convenient testing. */
@VisibleForTesting
public FnApiMonitoringInfoToCounterUpdateTransformer(
- UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer,
- UserDistributionMonitoringInfoToCounterUpdateTransformer userDistributionCounterTransformer,
Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers) {
- this.userCounterTransformer = userCounterTransformer;
- this.userDistributionCounterTransformer = userDistributionCounterTransformer;
this.counterTransformers.putAll(counterTransformers);
}
@@ -79,12 +78,6 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
@Nullable
public CounterUpdate transform(MonitoringInfo src) {
String urn = src.getUrn();
- if (urn.startsWith(userCounterTransformer.getSupportedUrnPrefix())) {
- return userCounterTransformer.transform(src);
- } else if (urn.startsWith(userDistributionCounterTransformer.getSupportedUrnPrefix())) {
- return this.userDistributionCounterTransformer.transform(src);
- }
-
MonitoringInfoToCounterUpdateTransformer transformer = counterTransformers.get(urn);
return transformer == null ? null : transformer.transform(src);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
index 1a0f3b1..3dec6e1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.model.CounterUpdate;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -34,13 +33,8 @@ import org.mockito.MockitoAnnotations;
public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
- @Mock private UserMonitoringInfoToCounterUpdateTransformer mockUserCounterTransformer;
-
- @Mock
- private UserDistributionMonitoringInfoToCounterUpdateTransformer
- mockUserDistributionCounterTransformer;
-
- @Mock private UserMonitoringInfoToCounterUpdateTransformer mockGenericTransformer1;
+ @Mock private UserMonitoringInfoToCounterUpdateTransformer mockTransformer2;
+ @Mock private UserMonitoringInfoToCounterUpdateTransformer mockTransformer1;
@Before
public void setUp() {
@@ -48,22 +42,22 @@ public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
}
@Test
- public void testTransformUtilizesUserCounterTransformerForUserCounters() {
- Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers =
- Collections.EMPTY_MAP;
+ public void testTransformUtilizesRelevantCounterTransformer() {
+ Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers = new HashMap<>();
+
+ final String validUrn = "urn1";
+ genericTransformers.put(validUrn, mockTransformer1);
+ genericTransformers.put("any:other:urn", mockTransformer2);
+
FnApiMonitoringInfoToCounterUpdateTransformer testObject =
- new FnApiMonitoringInfoToCounterUpdateTransformer(
- mockUserCounterTransformer,
- mockUserDistributionCounterTransformer,
- genericTransformers);
+ new FnApiMonitoringInfoToCounterUpdateTransformer(genericTransformers);
CounterUpdate expectedResult = new CounterUpdate();
- when(mockUserCounterTransformer.transform(any())).thenReturn(expectedResult);
- when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("user:prefix:");
+ when(mockTransformer1.transform(any())).thenReturn(expectedResult);
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("user:prefix:anyNamespace:anyName")
+ .setUrn(validUrn)
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
@@ -73,32 +67,25 @@ public class FnApiMonitoringInfoToCounterUpdateTransformerTest {
}
@Test
- public void testTransformUtilizesRelevantCounterTransformerForNonUserCounters() {
+ public void testTransformReturnsNullOnUnknownUrn() {
Map<String, MonitoringInfoToCounterUpdateTransformer> genericTransformers = new HashMap<>();
- final String validUrn = "urn1";
- genericTransformers.put(validUrn, mockGenericTransformer1);
- when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("invalid:prefix:");
- when(mockUserDistributionCounterTransformer.getSupportedUrnPrefix())
- .thenReturn("invalid:prefix2:");
+ genericTransformers.put("beam:metric:user", mockTransformer2);
FnApiMonitoringInfoToCounterUpdateTransformer testObject =
- new FnApiMonitoringInfoToCounterUpdateTransformer(
- mockUserCounterTransformer,
- mockUserDistributionCounterTransformer,
- genericTransformers);
+ new FnApiMonitoringInfoToCounterUpdateTransformer(genericTransformers);
CounterUpdate expectedResult = new CounterUpdate();
- when(mockGenericTransformer1.transform(any())).thenReturn(expectedResult);
+ when(mockTransformer1.transform(any())).thenReturn(expectedResult);
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn(validUrn)
+ .setUrn("any:other:urn")
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
CounterUpdate result = testObject.transform(monitoringInfo);
- assertSame(expectedResult, result);
+ assertSame(null, result);
}
}