You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/13 22:39:35 UTC
[2/5] incubator-beam git commit: Add the ability to query metrics on
PipelineResult
Add the ability to query metrics on PipelineResult
All runners currently implement this by throwing an
UnsupportedOperationException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51fee39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51fee39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51fee39b
Branch: refs/heads/master
Commit: 51fee39b7bc66d7f60ea2e0ce31e3cb516a89305
Parents: 8524ed9
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 10:55:05 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/direct/DirectRunner.java | 7 +++++++
.../org/apache/beam/runners/flink/FlinkRunnerResult.java | 6 ++++++
.../beam/runners/dataflow/DataflowPipelineJob.java | 7 +++++++
.../runners/spark/translation/EvaluationContext.java | 6 ++++++
.../main/java/org/apache/beam/sdk/PipelineResult.java | 11 +++++++++++
5 files changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index a72f7ae..e13046d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
@@ -380,6 +381,12 @@ public class DirectRunner
};
}
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException(
+ "The DirectRunner does not currently support metrics.");
+ }
+
/**
* Blocks until the {@link Pipeline} execution represented by this
* {@link DirectPipelineResult} is complete, returning the terminal state.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 90bb64d..6b15485 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
@@ -86,4 +87,9 @@ public class FlinkRunnerResult implements PipelineResult {
public State waitUntilFinish(Duration duration) {
throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish.");
}
+
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 269b824..bbcf11f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
@@ -426,6 +427,12 @@ public class DataflowPipelineJob implements PipelineResult {
}
}
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException(
+ "The DataflowRunner does not currently support metrics.");
+ }
+
private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
throws IOException {
if (aggregatorTransforms.contains(aggregator)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 2397276..1944b6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -263,6 +264,11 @@ public class EvaluationContext implements EvaluationResult {
}
@Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException("The SparkRunner does not currently support metrics.");
+ }
+
+ @Override
public <T> Iterable<T> get(PCollection<T> pcollection) {
@SuppressWarnings("unchecked")
RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index d9cdc16..d7774bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -18,6 +18,9 @@
package org.apache.beam.sdk;
import java.io.IOException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
@@ -127,4 +130,12 @@ public interface PipelineResult {
return hasReplacement;
}
}
+
+ /**
+ * Return the object to access metrics from the pipeline.
+ *
+ * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics.
+ */
+ @Experimental(Kind.METRICS)
+ MetricResults metrics();
}