You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/25 21:03:19 UTC
[beam] branch master updated: Merge pull request #8095 from
Ardagan/ElementCountJava
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 956a244 Merge pull request #8095 from Ardagan/ElementCountJava
956a244 is described below
commit 956a2449a7b7a71705d4ff2afbd4d3a05e9193cb
Author: Mikhail Gryzykhin <12...@users.noreply.github.com>
AuthorDate: Mon Mar 25 14:03:07 2019 -0700
Merge pull request #8095 from Ardagan/ElementCountJava
[BEAM-6824] Add Element count transformer for Java worker FnApi (#8095)
---
.../dataflow/worker/BatchDataflowWorker.java | 1 +
.../worker/BeamFnMapTaskExecutorFactory.java | 14 +++
.../worker/fn/control/BeamFnMapTaskExecutor.java | 3 +-
...ntMonitoringInfoToCounterUpdateTransformer.java | 113 ++++++++++++++++++++
...piMonitoringInfoToCounterUpdateTransformer.java | 8 +-
.../control/RegisterAndProcessBundleOperation.java | 8 ++
.../beam/runners/dataflow/worker/graph/Nodes.java | 8 +-
.../worker/graph/RegisterNodeFunction.java | 16 ++-
.../fn/control/BeamFnMapTaskExecutorTest.java | 4 +
...nitoringInfoToCounterUpdateTransformerTest.java | 116 +++++++++++++++++++++
.../RegisterAndProcessBundleOperationTest.java | 11 ++
.../runners/dataflow/worker/graph/NodesTest.java | 18 ++--
12 files changed, 309 insertions(+), 11 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 0c22f09..42b6c45 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -348,6 +348,7 @@ public class BatchDataflowWorker implements Closeable {
if (LOG.isDebugEnabled()) {
LOG.debug("Network as Graphviz .dot: {}", Networks.toDot(network));
}
+
worker =
mapTaskExecutorFactory.create(
sdkWorkerHarness.getControlClientHandler(),
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index f54c207..10e833f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -427,6 +427,19 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
entry.getKey(), executionContext.getStepContext(operationContext));
}
+ ImmutableMap.Builder<String, NameContext> pcollectionIdToNameContext =
+ ImmutableMap.builder();
+ for (Map.Entry<String, NameContext> entry :
+ input.getPCollectionToPartialNameContextMap().entrySet()) {
+ pcollectionIdToNameContext.put(
+ entry.getKey(),
+ NameContext.create(
+ stageName,
+ entry.getValue().originalName(),
+ entry.getValue().systemName(),
+ entry.getValue().userName()));
+ }
+
ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts =
ptransformIdToOperationContextBuilder.build();
@@ -448,6 +461,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
ptransformIdToStepContext.build(),
ptransformIdToSideInputReaders,
ptransformIdToSideInputIdToPCollectionView,
+ pcollectionIdToNameContext.build(),
// TODO: Set NameContext properly for these operations.
executionContext.createOperationContext(
NameContext.create(stageName, stageName, stageName, stageName))));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index 1baf1d2..f88da99 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -402,7 +402,8 @@ public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer =
new FnApiMonitoringInfoToCounterUpdateTransformer(
- bundleProcessOperation.getPtransformIdToUserStepContext());
+ this.bundleProcessOperation.getPtransformIdToUserStepContext(),
+ this.bundleProcessOperation.getPCollectionIdToNameContext());
counterUpdates =
monitoringInfos.stream()
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
new file mode 100644
index 0000000..f4c8565
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import com.google.api.services.dataflow.model.NameAndKind;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** MonitoringInfo to CounterUpdate transformer capable to transform ElementCount counters. */
+public class ElementCountMonitoringInfoToCounterUpdateTransformer
+ implements MonitoringInfoToCounterUpdateTransformer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class);
+
+ private final SpecMonitoringInfoValidator specValidator;
+ private final Map<String, NameContext> pcollectionIdToNameContext;
+ private static final String SUPPORTED_URN = "beam:metric:element_count:v1";
+
+ /**
+ * @param specValidator SpecMonitoringInfoValidator to utilize for default validation.
+ * @param pcollectionIdToNameContext This mapping is utilized to generate DFE CounterUpdate name.
+ */
+ public ElementCountMonitoringInfoToCounterUpdateTransformer(
+ SpecMonitoringInfoValidator specValidator,
+ Map<String, NameContext> pcollectionIdToNameContext) {
+ this.specValidator = specValidator;
+ this.pcollectionIdToNameContext = pcollectionIdToNameContext;
+ }
+
+ /**
+ * Validates provided monitoring info against specs and common safety checks.
+ *
+ * @param monitoringInfo to validate.
+ * @return Optional.empty() all validation checks are passed. Optional with error text otherwise.
+ * @throws RuntimeException if received unexpected urn.
+ */
+ protected Optional<String> validate(MonitoringInfo monitoringInfo) {
+ Optional<String> validatorResult = specValidator.validate(monitoringInfo);
+ if (validatorResult.isPresent()) {
+ return validatorResult;
+ }
+
+ String urn = monitoringInfo.getUrn();
+ if (!urn.equals(SUPPORTED_URN)) {
+ throw new RuntimeException(String.format("Received unexpected counter urn: %s", urn));
+ }
+
+ // todo(migryz): extract and utilize pcollection label from beam_fn_api.proto
+ if (!pcollectionIdToNameContext.containsKey(monitoringInfo.getLabelsMap().get("PCOLLECTION"))) {
+ return Optional.of(
+ "Encountered ElementCount MonitoringInfo with unknown PCollectionId: "
+ + monitoringInfo.toString());
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Generates CounterUpdate to send to DFE based on ElementCount MonitoringInfo.
+ *
+ * @param monitoringInfo Monitoring info to transform.
+ * @return CounterUpdate generated based on provided monitoringInfo
+ */
+ @Override
+ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
+ Optional<String> validationResult = validate(monitoringInfo);
+ if (validationResult.isPresent()) {
+ LOG.info(validationResult.get());
+ return null;
+ }
+
+ long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+
+ final String pcollectionId = monitoringInfo.getLabelsMap().get("PCOLLECTION");
+ final String pcollectionName = pcollectionIdToNameContext.get(pcollectionId).userName();
+
+ String counterName = pcollectionName + "-ElementCount";
+ NameAndKind name = new NameAndKind();
+ name.setName(counterName).setKind("SUM");
+
+ return new CounterUpdate()
+ .setNameAndKind(name)
+ .setCumulative(true)
+ .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+ }
+
+ /** @return iterable of Urns that this transformer can convert to CounterUpdates. */
+ public static String getSupportedUrn() {
+ return SUPPORTED_URN;
+ }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 8990e3c..b8f8a3b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
/**
@@ -37,7 +38,8 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
public FnApiMonitoringInfoToCounterUpdateTransformer(
- Map<String, DataflowStepContext> stepContextMap) {
+ Map<String, DataflowStepContext> stepContextMap,
+ Map<String, NameContext> sdkPCollectionIdToNameContext) {
SpecMonitoringInfoValidator specValidator = new SpecMonitoringInfoValidator();
this.userCounterTransformer =
new UserMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap);
@@ -47,6 +49,10 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
for (String urn : msecTransformer.getSupportedUrns()) {
this.counterTransformers.put(urn, msecTransformer);
}
+ this.counterTransformers.put(
+ ElementCountMonitoringInfoToCounterUpdateTransformer.getSupportedUrn(),
+ new ElementCountMonitoringInfoToCounterUpdateTransformer(
+ specValidator, sdkPCollectionIdToNameContext));
}
/** Allows for injection of user and generic counter transformers for more convenient testing. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 6518555..7799e6f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -51,6 +51,7 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.worker.ByteStringCoder;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
@@ -100,6 +101,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
private final Table<String, String, PCollectionView<?>>
ptransformIdToSideInputIdToPCollectionView;
private final ConcurrentHashMap<StateKey, BagState<ByteString>> userStateData;
+ private final Map<String, NameContext> pcollectionIdToNameContext;
private @Nullable CompletionStage<InstructionResponse> registerFuture;
private @Nullable CompletionStage<InstructionResponse> processBundleResponse;
@@ -118,6 +120,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
Map<String, DataflowStepContext> ptransformIdToSystemStepContext,
Map<String, SideInputReader> ptransformIdToSideInputReader,
Table<String, String, PCollectionView<?>> ptransformIdToSideInputIdToPCollectionView,
+ Map<String, NameContext> pcollectionIdToNameContext,
OperationContext context) {
super(EMPTY_RECEIVERS, context);
this.idGenerator = idGenerator;
@@ -126,6 +129,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
this.registerRequest = registerRequest;
this.ptransformIdToSideInputReader = ptransformIdToSideInputReader;
this.ptransformIdToSideInputIdToPCollectionView = ptransformIdToSideInputIdToPCollectionView;
+ this.pcollectionIdToNameContext = pcollectionIdToNameContext;
ImmutableMap.Builder<String, DataflowStepContext> userStepContextsMap = ImmutableMap.builder();
for (Map.Entry<String, DataflowStepContext> entry :
ptransformIdToSystemStepContext.entrySet()) {
@@ -301,6 +305,10 @@ public class RegisterAndProcessBundleOperation extends Operation {
}
}
+ public Map<String, NameContext> getPCollectionIdToNameContext() {
+ return this.pcollectionIdToNameContext;
+ }
+
public Map<String, DataflowStepContext> getPtransformIdToUserStepContext() {
return ptransformIdToUserStepContext;
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 7a2d873..139b274 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -289,14 +289,16 @@ public class Nodes {
BeamFnApi.RegisterRequest request,
Map<String, NameContext> ptransformIdToPartialNameContextMap,
Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
- Map<String, Iterable<PCollectionView<?>>> pTransformIdToPCollectionViewMap) {
+ Map<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViewMap,
+ Map<String, NameContext> pcollectionToPartialNameContextMap) {
checkNotNull(request);
checkNotNull(ptransformIdToPartialNameContextMap);
return new AutoValue_Nodes_RegisterRequestNode(
request,
ptransformIdToPartialNameContextMap,
ptransformIdToSideInputInfoMap,
- pTransformIdToPCollectionViewMap);
+ ptransformIdToPCollectionViewMap,
+ pcollectionToPartialNameContextMap);
}
public abstract BeamFnApi.RegisterRequest getRegisterRequest();
@@ -307,6 +309,8 @@ public class Nodes {
public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();
+ public abstract Map<String, NameContext> getPCollectionToPartialNameContextMap();
+
@Override
public String toString() {
// The request may be very large.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index 6dea449..c5e7d13 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -219,7 +219,11 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
ImmutableMap.builder();
ImmutableMap.Builder<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, NameContext> pcollectionIdToNameContexts = ImmutableMap.builder();
+ // For each instruction output node:
+ // 1. Generate new Coder and register it with SDKComponents and ProcessBundleDescriptor.
+ // 2. Generate new PCollectionId and register it with ProcessBundleDescriptor.
for (InstructionOutputNode node :
Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
InstructionOutput instructionOutput = node.getInstructionOutput();
@@ -259,6 +263,8 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
e);
}
+ // Generate new PCollection ID and map it to relevant node.
+ // Will later be used to fill PTransform inputs/outputs information.
String pcollectionId = "generatedPcollection" + idGenerator.getId();
processBundleDescriptor.putPcollections(
pcollectionId,
@@ -267,6 +273,13 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
.setWindowingStrategyId(fakeWindowingStrategyId)
.build());
nodesToPCollections.put(node, pcollectionId);
+ pcollectionIdToNameContexts.put(
+ pcollectionId,
+ NameContext.create(
+ null,
+ instructionOutput.getOriginalName(),
+ instructionOutput.getSystemName(),
+ instructionOutput.getName()));
}
processBundleDescriptor.putAllCoders(sdkComponents.toComponents().getCodersMap());
@@ -430,7 +443,8 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build(),
ptransformIdToNameContexts.build(),
ptransformIdToSideInputInfos.build(),
- ptransformIdToPCollectionViews.build());
+ ptransformIdToPCollectionViews.build(),
+ pcollectionIdToNameContexts.build());
}
/**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 02ac1ba..38e3d7a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -184,6 +184,7 @@ public class BeamFnMapTaskExecutorTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
@@ -287,6 +288,7 @@ public class BeamFnMapTaskExecutorTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
@@ -396,6 +398,7 @@ public class BeamFnMapTaskExecutorTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
@@ -519,6 +522,7 @@ public class BeamFnMapTaskExecutorTest {
stepContextMap,
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java
new file mode 100644
index 0000000..a45463b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class ElementCountMonitoringInfoToCounterUpdateTransformerTest {
+
+ @Rule public final ExpectedException exception = ExpectedException.none();
+
+ @Mock private SpecMonitoringInfoValidator mockSpecValidator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void tesTransformReturnsNullIfSpecValidationFails() {
+ Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+ ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+ new ElementCountMonitoringInfoToCounterUpdateTransformer(
+ mockSpecValidator, pcollectionNameMapping);
+ Optional<String> error = Optional.of("Error text");
+ when(mockSpecValidator.validate(any())).thenReturn(error);
+ assertEquals(null, testObject.transform(null));
+ }
+
+ @Test
+ public void testTransformThrowsIfMonitoringInfoWithWrongUrnPrefixReceived() {
+ Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+ MonitoringInfo monitoringInfo =
+ MonitoringInfo.newBuilder().setUrn("beam:user:metric:element_count:v1").build();
+ ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+ new ElementCountMonitoringInfoToCounterUpdateTransformer(
+ mockSpecValidator, pcollectionNameMapping);
+ when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+ exception.expect(RuntimeException.class);
+ testObject.transform(monitoringInfo);
+ }
+
+ @Test
+ public void testTransformReturnsNullIfMonitoringInfoWithUnknownPCollectionLabelPresent() {
+ Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+ MonitoringInfo monitoringInfo =
+ MonitoringInfo.newBuilder()
+ .setUrn("beam:metric:element_count:v1")
+ .putLabels("PCOLLECTION", "anyValue")
+ .build();
+ ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+ new ElementCountMonitoringInfoToCounterUpdateTransformer(
+ mockSpecValidator, pcollectionNameMapping);
+ when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+ assertEquals(null, testObject.transform(monitoringInfo));
+ }
+
+ @Test
+ public void testTransformReturnsValidCounterUpdateWhenValidMonitoringInfoReceived() {
+ Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+ pcollectionNameMapping.put(
+ "anyValue",
+ NameContext.create("anyStageName", "anyOriginName", "anySystemName", "transformedValue"));
+
+ MonitoringInfo monitoringInfo =
+ MonitoringInfo.newBuilder()
+ .setUrn("beam:metric:element_count:v1")
+ .putLabels("PCOLLECTION", "anyValue")
+ .build();
+ ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+ new ElementCountMonitoringInfoToCounterUpdateTransformer(
+ mockSpecValidator, pcollectionNameMapping);
+ when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+ CounterUpdate result = testObject.transform(monitoringInfo);
+ assertNotEquals(null, result);
+
+ assertEquals(
+ "{cumulative=true, integer={highBits=0, lowBits=0}, "
+ + "nameAndKind={kind=SUM, "
+ + "name=transformedValue-ElementCount}}",
+ result.toString());
+ }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 16d4d11..71c36df 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -174,6 +174,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext)
.supportsRestart();
}
@@ -215,6 +216,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
// Ensure that the first time we start we send the register and process bundle requests
@@ -320,6 +322,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
operation.start();
@@ -423,6 +426,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
operation.start();
@@ -493,6 +497,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
operation.start();
@@ -613,6 +618,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of("testPTransformId", mockStepContext),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
operation.start();
@@ -740,6 +746,7 @@ public class RegisterAndProcessBundleOperationTest {
FullWindowedValueCoder.of(
KvCoder.of(ByteArrayCoder.of(), StringUtf8Coder.of()),
GlobalWindow.Coder.INSTANCE))),
+ ImmutableMap.of(),
mockContext);
operation.start();
@@ -791,6 +798,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
abortReference.set(operation::abort);
operation.start();
@@ -838,6 +846,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
abortReference.set(operation::abort);
operation.start();
@@ -875,6 +884,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
assertEquals(
@@ -897,6 +907,7 @@ public class RegisterAndProcessBundleOperationTest {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
+ ImmutableMap.of(),
mockContext);
operation.getProcessBundleInstructionId(); // this generates and caches bundleId
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
index c5a83f9..5a71d6d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
@@ -118,23 +118,29 @@ public class NodesTest {
GlobalWindow.Coder.INSTANCE))));
assertSame(
param,
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
.getRegisterRequest());
assertSame(
nameContexts,
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
.getPTransformIdToPartialNameContextMap());
assertSame(
sideInputInfos,
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
.getPTransformIdToSideInputInfoMap());
assertSame(
pcollectionViews,
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews)
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of())
.getPTransformIdToPCollectionViewMap());
assertNotEquals(
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews),
- RegisterRequestNode.create(param, nameContexts, sideInputInfos, pcollectionViews));
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of()),
+ RegisterRequestNode.create(
+ param, nameContexts, sideInputInfos, pcollectionViews, ImmutableMap.of()));
}
@Test