You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/15 18:13:48 UTC
[beam] branch master updated: [BEAM-6138] Update java SDK to report
user distribution tuple metrics over the FN API
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e41ab8 [BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN API
new 0294039 Merge pull request #8280 from ajamato/java_user_distribution
8e41ab8 is described below
commit 8e41ab88ead825a2cc30a6b2f3dabffb748ffa2a
Author: Alex Amato <aj...@google.com>
AuthorDate: Thu Apr 11 13:00:19 2019 -0700
[BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN
API
---
.../runners/core/metrics/MetricsContainerImpl.java | 50 ++++++++++++++-
.../core/metrics/MonitoringInfoConstants.java | 2 +
.../core/metrics/SimpleMonitoringInfoBuilder.java | 28 ++++++++-
.../core/metrics/SpecMonitoringInfoValidator.java | 2 +-
.../core/metrics/MetricsContainerImplTest.java | 56 +++++++++++++++++
.../metrics/SimpleMonitoringInfoBuilderTest.java | 31 +++++++++-
.../metrics/SpecMonitoringInfoValidatorTest.java | 15 ++++-
.../fnexecution/control/RemoteExecutionTest.java | 45 ++++++++++++++
.../org/apache/beam/runners/jet/DAGBuilder.java | 6 +-
.../org/apache/beam/runners/jet/JetRunner.java | 6 +-
.../beam/runners/jet/JetTransformTranslator.java | 6 +-
.../beam/runners/jet/JetTransformTranslators.java | 72 +++++++++++++++++-----
.../beam/runners/jet/metrics/JetMetricResults.java | 3 +-
.../runners/jet/processors/BoundedSourceP.java | 3 +-
.../beam/runners/jet/processors/TestStreamP.java | 58 +++++++++--------
.../beam/runners/jet/processors/WindowGroupP.java | 3 +-
16 files changed, 325 insertions(+), 61 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 827c9bf..a2cd286 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -27,7 +27,6 @@ import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -187,6 +186,45 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
return builder.build();
}
+ /**
+ * @param metricUpdate
+ * @return The MonitoringInfo generated from the metricUpdate.
+ */
+ @Nullable
+ private MonitoringInfo distributionUpdateToMonitoringInfo(
+ MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) {
+ SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
+ MetricName metricName = metricUpdate.getKey().metricName();
+ if (metricName instanceof MonitoringInfoMetricName) {
+ MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
+ // Represents a specific MonitoringInfo for a specific URN.
+ builder.setUrn(monitoringInfoName.getUrn());
+ for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
+ builder.setLabel(e.getKey(), e.getValue());
+ }
+ } else { // Note: (metricName instanceof MetricName) is always True.
+ // Represents a user counter.
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
+ metricUpdate.getKey().metricName().getNamespace())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME, metricUpdate.getKey().metricName().getName());
+
+ // Drop if the stepname is not set. All user counters must be
+ // defined for a PTransform. They must be defined on a container bound to a step.
+ if (this.stepName == null) {
+ // TODO(BEAM-7191): Consider logging a warning with a quiet logging API.
+ return null;
+ }
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricUpdate.getKey().stepName());
+ }
+ builder.setInt64DistributionValue(metricUpdate.getUpdate());
+ builder.setTimestampToNow();
+ return builder.build();
+ }
+
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
public Iterable<MonitoringInfo> getMonitoringInfos() {
// Extract user metrics and store as MonitoringInfos.
@@ -199,6 +237,14 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
monitoringInfos.add(mi);
}
}
+
+ for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
+ metricUpdates.distributionUpdates()) {
+ MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate);
+ if (mi != null) {
+ monitoringInfos.add(mi);
+ }
+ }
return monitoringInfos;
}
@@ -266,7 +312,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
LOG.warn("Unsupported CounterData type: {}", counterData);
}
} else if (metric.hasDistributionData()) {
- DistributionData distributionData = metric.getDistributionData();
+ MetricsApi.DistributionData distributionData = metric.getDistributionData();
if (distributionData.hasIntDistributionData()) {
Distribution distribution = getDistribution(metricName);
IntDistributionData intDistributionData = distributionData.getIntDistributionData();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 3716709..ecfd38d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -42,6 +42,8 @@ public final class MonitoringInfoConstants {
public static final String USER_COUNTER = extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
public static final String USER_DISTRIBUTION_COUNTER =
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_COUNTER);
+ public static final String SAMPLED_BYTE_SIZE =
+ extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
}
/** Standardised MonitoringInfo labels that can be utilized by runners. */
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index 23fa440..d99ee84 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -42,8 +42,9 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
*
* <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
* builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN); builder.setInt64Value(1);
- * builder.setPTransformLabel("myTransform"); builder.setPCollectionLabel("myPcollection");
- * MonitoringInfo mi = builder.build();
+ * builder.setPTransformLabel("myTransform");
+ * builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myTransform"); MonitoringInfo mi =
+ * builder.build();
*/
public class SimpleMonitoringInfoBuilder {
private final boolean validateAndDropInvalid;
@@ -99,6 +100,29 @@ public class SimpleMonitoringInfoBuilder {
return this;
}
+ /**
+ * Sets the IntDistributionData of the DistributionData in the MonitoringInfo, and the appropriate
+ * type URN.
+ */
+ public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
+ this.builder
+ .getMetricBuilder()
+ .getDistributionDataBuilder()
+ .getIntDistributionDataBuilder()
+ .setCount(data.count())
+ .setSum(data.sum())
+ .setMin(data.min())
+ .setMax(data.max());
+ this.setInt64DistributionTypeUrn();
+ return this;
+ }
+
+ /** Sets the the appropriate type URN for int64 distribution tuples. */
+ public SimpleMonitoringInfoBuilder setInt64DistributionTypeUrn() {
+ this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64);
+ return this;
+ }
+
/** Sets the the appropriate type URN for sum int64 counters. */
public SimpleMonitoringInfoBuilder setInt64TypeUrn() {
this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
index 8ac8bd6..58aef95 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
@@ -50,7 +50,7 @@ public class SpecMonitoringInfoValidator {
MonitoringInfoSpec spec = null;
for (MonitoringInfoSpec specIterator : specs) {
- if (monitoringInfo.getUrn().startsWith(specIterator.getUrn())) {
+ if (monitoringInfo.getUrn().equals(specIterator.getUrn())) {
spec = specIterator;
break;
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 61e4077..c2466ad 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -183,6 +183,62 @@ public class MetricsContainerImplTest {
}
@Test
+ public void testMonitoringInfosArePopulatedForUserDistributions() {
+ MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+ DistributionCell c1 = testObject.getDistribution(MetricName.named("ns", "name1"));
+ DistributionCell c2 = testObject.getDistribution(MetricName.named("ns", "name2"));
+ c1.update(5L);
+ c2.update(4L);
+
+ SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+ builder1
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1")
+ .setInt64DistributionValue(DistributionData.create(5, 1, 5, 5));
+
+ SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+ builder2
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1")
+ .setInt64DistributionValue(DistributionData.create(4, 1, 4, 4));
+
+ ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
+ for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+ actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
+ }
+
+ assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
+ }
+
+ @Test
+ public void testMonitoringInfosArePopulatedForSystemDistributions() {
+ MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+ HashMap<String, String> labels = new HashMap<>();
+ labels.put(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll1");
+ DistributionCell c1 =
+ testObject.getDistribution(
+ MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, labels));
+ c1.update(5L);
+
+ SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+ builder1
+ .setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE)
+ .setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll1")
+ .setInt64DistributionValue(DistributionData.create(5, 1, 5, 5));
+
+ ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
+ for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+ actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
+ }
+
+ assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build()));
+ }
+
+ @Test
public void testMonitoringInfosArePopulatedForABeamCounter() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
HashMap<String, String> labels = new HashMap<String, String>();
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index ee42da4..1f4c608 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -21,6 +21,7 @@ import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,9 +45,9 @@ public class SimpleMonitoringInfoBuilderTest {
public void testReturnsExpectedMonitoringInfo() {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
+
builder.setInt64Value(1);
builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "myPcollection");
-
// Pass now that the spec is fully met.
MonitoringInfo monitoringInfo = builder.build();
assertTrue(monitoringInfo != null);
@@ -60,4 +61,32 @@ public class SimpleMonitoringInfoBuilderTest {
"myPcollection",
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PCOLLECTION));
}
+
+ @Test
+ public void testUserDistribution() {
+ SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+ builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER);
+ builder.setLabel(MonitoringInfoConstants.Labels.NAME, "myName");
+ builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace");
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myStep");
+ assertNull(builder.build());
+
+ builder.setInt64DistributionValue(DistributionData.create(10, 2, 1, 9));
+ // Pass now that the spec is fully met.
+ MonitoringInfo monitoringInfo = builder.build();
+ assertTrue(monitoringInfo != null);
+ assertEquals(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER, monitoringInfo.getUrn());
+ assertEquals(
+ "myName", monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.NAME, ""));
+ assertEquals(
+ "myNamespace",
+ monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.NAMESPACE, ""));
+ assertEquals(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64, monitoringInfo.getType());
+ MetricsApi.IntDistributionData distribution =
+ monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
+ assertEquals(10, distribution.getSum());
+ assertEquals(2, distribution.getCount());
+ assertEquals(9, distribution.getMax());
+ assertEquals(1, distribution.getMin());
+ }
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index a993545..6a0e936 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -40,7 +40,9 @@ public class SpecMonitoringInfoValidatorTest {
public void validateReturnsErrorOnInvalidMonitoringInfoType() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user:someCounter")
+ .setUrn("beam:metric:user")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyCounter")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "namespace")
.setType("beam:metrics:bad_value")
.build();
assertTrue(testObject.validate(testInput).isPresent());
@@ -60,6 +62,17 @@ public class SpecMonitoringInfoValidatorTest {
testInput =
MonitoringInfo.newBuilder()
+ .setUrn(Urns.USER_DISTRIBUTION_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyDistribution")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "namespace")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyString")
+ .setType(TypeUrns.DISTRIBUTION_INT64)
+ .putLabels("dummy", "value")
+ .build();
+ assertFalse(testObject.validate(testInput).isPresent());
+
+ testInput =
+ MonitoringInfo.newBuilder()
.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT)
.setType(TypeUrns.SUM_INT64)
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "value")
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 1d87dd6..3aea2b9 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
import org.apache.beam.runners.core.metrics.MonitoringInfoMatchers;
@@ -500,6 +501,9 @@ public class RemoteExecutionTest implements Serializable {
final String processUserCounterName = "processUserCounter";
final String startUserCounterName = "startUserCounter";
final String finishUserCounterName = "finishUserCounter";
+ final String processUserDistributionName = "processUserDistribution";
+ final String startUserDistributionName = "startUserDistribution";
+ final String finishUserDistributionName = "finishUserDistribution";
Pipeline p = Pipeline.create();
// TODO(BEAM-6597): Remove sleeps in this test after collecting MonitoringInfos in
// ProcessBundleProgressResponses. Use CountDownLatches to wait in start, finish and process
@@ -518,6 +522,8 @@ public class RemoteExecutionTest implements Serializable {
public void startBundle() throws InterruptedException {
Thread.sleep(1000);
startCounter.inc(10);
+ Metrics.distribution(RemoteExecutionTest.class, startUserDistributionName)
+ .update(10);
}
@SuppressWarnings("unused")
@@ -531,6 +537,9 @@ public class RemoteExecutionTest implements Serializable {
ctxt.output("two");
Thread.sleep(1000);
Metrics.counter(RemoteExecutionTest.class, processUserCounterName).inc();
+ Metrics.distribution(
+ RemoteExecutionTest.class, processUserDistributionName)
+ .update(1);
}
emitted = true;
}
@@ -539,6 +548,8 @@ public class RemoteExecutionTest implements Serializable {
public void finishBundle() throws InterruptedException {
Thread.sleep(1000);
Metrics.counter(RemoteExecutionTest.class, finishUserCounterName).inc(100);
+ Metrics.distribution(RemoteExecutionTest.class, finishUserDistributionName)
+ .update(100);
}
}))
.setCoder(StringUtf8Coder.of());
@@ -624,6 +635,7 @@ public class RemoteExecutionTest implements Serializable {
public void onCompleted(ProcessBundleResponse response) {
List<Matcher<MonitoringInfo>> matchers = new ArrayList<Matcher<MonitoringInfo>>();
+ // User Counters.
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder
.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
@@ -657,6 +669,39 @@ public class RemoteExecutionTest implements Serializable {
builder.setInt64Value(100);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+ // User Distributions.
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME, processUserDistributionName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+ builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 1));
+ matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME, startUserDistributionName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+ builder.setInt64DistributionValue(DistributionData.create(10, 1, 10, 10));
+ matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME, finishUserDistributionName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+ builder.setInt64DistributionValue(DistributionData.create(100, 1, 100, 100));
+ matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
// The element counter should be counted only once for the pcollection.
// So there should be only two elements.
builder = new SimpleMonitoringInfoBuilder();
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
index 7661113..3886e85 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
@@ -101,9 +101,7 @@ public class DAGBuilder {
}
Vertex addVertex(String id, SupplierEx<Processor> processor) {
- return dag.newVertex(id, processor)
- .localParallelism(localParallelism)
- ;
+ return dag.newVertex(id, processor).localParallelism(localParallelism);
}
private void wireUp() {
@@ -208,7 +206,7 @@ public class DAGBuilder {
@Override
public Object applyEx(byte[] b) throws Exception {
- Object t = CoderUtils.decodeFromByteArray(coder, b); //todo: decoding twice....
+ Object t = CoderUtils.decodeFromByteArray(coder, b); // todo: decoding twice....
Object key = null;
if (t instanceof WindowedValue) {
t = ((WindowedValue) t).getValue();
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
index efad9d3..562f6a4 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
@@ -87,10 +87,8 @@ public class JetRunner extends PipelineRunner<PipelineResult> {
public PipelineResult run(Pipeline pipeline) {
Boolean startOwnCluster = options.getJetStartOwnCluster();
if (startOwnCluster) {
- Collection<JetInstance> jetInstances = Arrays.asList(
- Jet.newJetInstance(),
- Jet.newJetInstance()
- );
+ Collection<JetInstance> jetInstances =
+ Arrays.asList(Jet.newJetInstance(), Jet.newJetInstance());
LOG.info("Started " + jetInstances.size() + " Jet cluster members");
}
try {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
index 44b543c..ed4a76f 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
@@ -25,5 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
interface JetTransformTranslator<T extends PTransform> {
- Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, TransformHierarchy.Node node, JetTranslationContext context);
+ Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ TransformHierarchy.Node node,
+ JetTranslationContext context);
}
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
index 71f1264..c672eaf 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
@@ -91,16 +91,21 @@ class JetTransformTranslators {
implements JetTransformTranslator<PTransform<PBegin, PCollection<T>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
if (!Utils.isBounded(appliedTransform)) {
throw new UnsupportedOperationException(); // todo
}
BoundedSource<T> source;
try {
- source = ReadTranslation.boundedSourceFromTransform(
- (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) appliedTransform
- );
+ source =
+ ReadTranslation.boundedSourceFromTransform(
+ (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+ appliedTransform);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -129,7 +134,11 @@ class JetTransformTranslators {
implements JetTransformTranslator<PTransform<PCollection, PCollectionTuple>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
boolean usesStateOrTimers = Utils.usesStateOrTimers(appliedTransform);
DoFn<?, ?> doFn = Utils.getDoFn(appliedTransform);
@@ -234,7 +243,11 @@ class JetTransformTranslators {
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
String transformName = appliedTransform.getFullName();
PCollection<KV<K, InputT>> input = Utils.getInput(appliedTransform);
@@ -271,12 +284,18 @@ class JetTransformTranslators {
implements JetTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
PCollectionView<T> view;
try {
- view = CreatePCollectionViewTranslation.getView(
- (AppliedPTransform<PCollection<T>, PCollection<T>, PTransform<PCollection<T>, PCollection<T>>>) appliedTransform
- );
+ view =
+ CreatePCollectionViewTranslation.getView(
+ (AppliedPTransform<
+ PCollection<T>, PCollection<T>, PTransform<PCollection<T>, PCollection<T>>>)
+ appliedTransform);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -311,7 +330,11 @@ class JetTransformTranslators {
implements JetTransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
Collection<PValue> mainInputs = Utils.getMainInputs(pipeline, node);
Map<String, Coder> inputCoders =
Utils.getCoders(
@@ -341,7 +364,11 @@ class JetTransformTranslators {
private static class WindowTranslator<T>
implements JetTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
WindowingStrategy<T, BoundedWindow> windowingStrategy =
(WindowingStrategy<T, BoundedWindow>)
((PCollection) Utils.getOutput(appliedTransform).getValue()).getWindowingStrategy();
@@ -372,7 +399,11 @@ class JetTransformTranslators {
private static class ImpulseTranslator
implements JetTransformTranslator<PTransform<PBegin, PCollection<byte[]>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
String transformName = appliedTransform.getFullName();
DAGBuilder dagBuilder = context.getDagBuilder();
String vertexId = dagBuilder.newVertexId(transformName);
@@ -391,7 +422,11 @@ class JetTransformTranslators {
private static class TestStreamTranslator<T>
implements JetTransformTranslator<PTransform<PBegin, PCollection<T>>> {
@Override
- public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+ public Vertex translate(
+ Pipeline pipeline,
+ AppliedPTransform<?, ?, ?> appliedTransform,
+ Node node,
+ JetTranslationContext context) {
String transformName = appliedTransform.getFullName();
DAGBuilder dagBuilder = context.getDagBuilder();
String vertexId = dagBuilder.newVertexId(transformName);
@@ -402,10 +437,12 @@ class JetTransformTranslators {
// the collection.
Map.Entry<TupleTag<?>, PValue> output = Utils.getOutput(appliedTransform);
Coder outputCoder = Utils.getCoder((PCollection) output.getValue());
- TestStream.TestStreamCoder<T> payloadCoder = TestStream.TestStreamCoder.of(testStream.getValueCoder());
+ TestStream.TestStreamCoder<T> payloadCoder =
+ TestStream.TestStreamCoder.of(testStream.getValueCoder());
byte[] encodedPayload = getEncodedPayload(testStream, payloadCoder);
Vertex vertex =
- dagBuilder.addVertex(vertexId, TestStreamP.supplier(encodedPayload, payloadCoder, outputCoder));
+ dagBuilder.addVertex(
+ vertexId, TestStreamP.supplier(encodedPayload, payloadCoder, outputCoder));
String outputEdgeId = Utils.getTupleTagId(output.getValue());
dagBuilder.registerCollectionOfEdge(outputEdgeId, output.getKey().getId());
@@ -413,7 +450,8 @@ class JetTransformTranslators {
return vertex;
}
- private static <T> byte[] getEncodedPayload(TestStream<T> testStream, TestStream.TestStreamCoder<T> coder) {
+ private static <T> byte[] getEncodedPayload(
+ TestStream<T> testStream, TestStream.TestStreamCoder<T> coder) {
try {
return CoderUtils.encodeToByteArray(coder, testStream);
} catch (CoderException e) {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
index 08c5ec2..519e612 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
@@ -131,7 +131,8 @@ public class JetMetricResults extends MetricResults
.toList();
}
- private MetricResult<DistributionResult> distributionUpdateToResult(Map.Entry<MetricKey, DistributionData> entry) {
+ private MetricResult<DistributionResult> distributionUpdateToResult(
+ Map.Entry<MetricKey, DistributionData> entry) {
MetricKey key = entry.getKey();
DistributionResult distributionResult = entry.getValue().extractResult();
return MetricResult.create(key, distributionResult, distributionResult);
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
index 37f58e8..5e52a4f 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
@@ -90,7 +90,8 @@ public class BoundedSourceP<T> extends AbstractProcessor implements Traverser {
}
return outputCoder == null
? res
- : Utils.encodeWindowedValue(res, outputCoder); // todo: this is not nice, have done this only as a quick fix for
+ : Utils.encodeWindowedValue(
+ res, outputCoder); // todo: this is not nice, have done this only as a quick fix for
// BoundedSourcePTest
} catch (IOException e) {
throw rethrow(e);
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
index aff3ac9..cf491d5 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
@@ -47,34 +47,42 @@ public class TestStreamP extends AbstractProcessor {
@SuppressWarnings("unchecked")
private TestStreamP(byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
List events = decodePayload(payload, payloadCoder).getEvents();
- traverser = Traversers.traverseStream(
- events.stream()
- .flatMap(
- event -> {
- if (event instanceof TestStream.WatermarkEvent) {
- Instant watermark = ((TestStream.WatermarkEvent) event).getWatermark();
- if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
- // this is an element added by advanceWatermarkToInfinity(), we ignore it,
- // it's always at the end
- return null;
- }
- return Stream.of(new Watermark(watermark.getMillis()));
- } else if (event instanceof TestStream.ElementEvent) {
- return StreamSupport.stream(((TestStream.ElementEvent<?>) event).getElements().spliterator(), false)
- .map(tv -> WindowedValue.timestampedValueInGlobalWindow(tv.getValue(), tv.getTimestamp()))
- .map(wV -> Utils.encodeWindowedValue(wV, outputCoder));
- } else {
- throw new UnsupportedOperationException("Event type not supported in TestStream: " + event.getClass() + ", event: " + event);
- }
- }
- )
- );
+ traverser =
+ Traversers.traverseStream(
+ events.stream()
+ .flatMap(
+ event -> {
+ if (event instanceof TestStream.WatermarkEvent) {
+ Instant watermark = ((TestStream.WatermarkEvent) event).getWatermark();
+ if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+ // this is an element added by advanceWatermarkToInfinity(), we ignore it,
+ // it's always at the end
+ return null;
+ }
+ return Stream.of(new Watermark(watermark.getMillis()));
+ } else if (event instanceof TestStream.ElementEvent) {
+ return StreamSupport.stream(
+ ((TestStream.ElementEvent<?>) event).getElements().spliterator(),
+ false)
+ .map(
+ tv ->
+ WindowedValue.timestampedValueInGlobalWindow(
+ tv.getValue(), tv.getTimestamp()))
+ .map(wV -> Utils.encodeWindowedValue(wV, outputCoder));
+ } else {
+ throw new UnsupportedOperationException(
+ "Event type not supported in TestStream: "
+ + event.getClass()
+ + ", event: "
+ + event);
+ }
+ }));
}
- public static <T> ProcessorMetaSupplier supplier(byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
+ public static <T> ProcessorMetaSupplier supplier(
+ byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
return ProcessorMetaSupplier.forceTotalParallelismOne(
- ProcessorSupplier.of(() -> new TestStreamP(payload, payloadCoder, outputCoder))
- );
+ ProcessorSupplier.of(() -> new TestStreamP(payload, payloadCoder, outputCoder)));
}
private static TestStream decodePayload(byte[] payload, TestStream.TestStreamCoder coder) {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
index 891be0e..29c4977 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
@@ -109,7 +109,8 @@ public class WindowGroupP<K, V> extends AbstractProcessor {
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPane());
- KeyManager keyManager = keyManagers.computeIfAbsent(key, k -> new KeyManager(k, latestWatermark));
+ KeyManager keyManager =
+ keyManagers.computeIfAbsent(key, k -> new KeyManager(k, latestWatermark));
keyManager.processElement(updatedWindowedValue);
}
return appendableTraverser;