You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:17 UTC
[4/4] beam git commit: Removing Aggregators from Examples
Removing Aggregators from Examples
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/904b4130
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/904b4130
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/904b4130
Branch: refs/heads/master
Commit: 904b4130b8ba4e9edc0c776da99cbe46d00d9442
Parents: 1d97bdf
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 12:50:38 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 20 +++++++++-----------
.../org/apache/beam/examples/WordCount.java | 9 ++++-----
.../cookbook/CombinePerKeyExamples.java | 9 ++++-----
.../beam/examples/complete/game/GameStats.java | 8 ++++----
.../beam/examples/complete/game/UserScore.java | 8 ++++----
5 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 031f317..4c82f46 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -22,14 +22,14 @@ import java.util.List;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
* <p>New Concepts:
* <pre>
* 1. Logging using SLF4J, even in a distributed environment
- * 2. Creating a custom aggregator (runners have varying levels of support)
+ * 2. Creating a custom metric (runners have varying levels of support)
* 3. Testing your Pipeline via PAssert
* </pre>
*
@@ -90,14 +90,12 @@ public class DebuggingWordCount {
}
/**
- * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
- * runner provides varying levels of support for aggregators, and may expose them
+ * Concept #2: A custom metric can track values in your pipeline as it runs. Each
+ * runner provides varying levels of support for metrics, and may expose them
* in a dashboard, etc.
*/
- private final Aggregator<Long, Long> matchedWords =
- createAggregator("matchedWords", Sum.ofLongs());
- private final Aggregator<Long, Long> unmatchedWords =
- createAggregator("unmatchedWords", Sum.ofLongs());
+ private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
+ private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
@ProcessElement
public void processElement(ProcessContext c) {
@@ -105,14 +103,14 @@ public class DebuggingWordCount {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: " + c.element().getKey());
- matchedWords.addValue(1L);
+ matchedWords.inc();
c.output(c.element());
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
LOG.trace("Did not match: " + c.element().getKey());
- unmatchedWords.addValue(1L);
+ unmatchedWords.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 7e21d47..0c786bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -19,19 +19,19 @@ package org.apache.beam.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -86,13 +86,12 @@ public class WordCount {
* to a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
+ emptyLines.inc();
}
// Split the line into words.
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 8d13b90..39553a5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -24,18 +24,18 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -79,8 +79,7 @@ public class CombinePerKeyExamples {
* outputs word, play_name.
*/
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
- private final Aggregator<Long, Long> smallerWords =
- createAggregator("smallerWords", Sum.ofLongs());
+ private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
@ProcessElement
public void processElement(ProcessContext c){
@@ -92,7 +91,7 @@ public class CombinePerKeyExamples {
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.
- smallerWords.addValue(1L);
+ smallerWords.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 9c79fad..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -26,10 +26,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -126,8 +127,7 @@ public class GameStats extends LeaderBoard {
.apply("ProcessAndFilter", ParDo
// use the derived mean total score as a side input
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
- private final Aggregator<Long, Long> numSpammerUsers =
- createAggregator("SpammerUsers", Sum.ofLongs());
+ private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
@@ -135,7 +135,7 @@ public class GameStats extends LeaderBoard {
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info("user " + c.element().getKey() + " spammer score " + score
+ " with mean " + gmc);
- numSpammerUsers.addValue(1L);
+ numSpammerUsers.inc();
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index b4b023f..0adaabc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -25,12 +25,13 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -125,8 +126,7 @@ public class UserScore {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
- private final Aggregator<Long, Long> numParseErrors =
- createAggregator("ParseErrors", Sum.ofLongs());
+ private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
@ProcessElement
public void processElement(ProcessContext c) {
@@ -139,7 +139,7 @@ public class UserScore {
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
- numParseErrors.addValue(1L);
+ numParseErrors.inc();
LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
}
}