You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:17 UTC

[4/4] beam git commit: Removing Aggregators from Examples

Removing Aggregators from Examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/904b4130
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/904b4130
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/904b4130

Branch: refs/heads/master
Commit: 904b4130b8ba4e9edc0c776da99cbe46d00d9442
Parents: 1d97bdf
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 12:50:38 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       | 20 +++++++++-----------
 .../org/apache/beam/examples/WordCount.java     |  9 ++++-----
 .../cookbook/CombinePerKeyExamples.java         |  9 ++++-----
 .../beam/examples/complete/game/GameStats.java  |  8 ++++----
 .../beam/examples/complete/game/UserScore.java  |  8 ++++----
 5 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 031f317..4c82f46 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -22,14 +22,14 @@ import java.util.List;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
  * <p>New Concepts:
  * <pre>
  *   1. Logging using SLF4J, even in a distributed environment
- *   2. Creating a custom aggregator (runners have varying levels of support)
+ *   2. Creating a custom metric (runners have varying levels of support)
  *   3. Testing your Pipeline via PAssert
  * </pre>
  *
@@ -90,14 +90,12 @@ public class DebuggingWordCount {
     }
 
     /**
-     * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
-     * runner provides varying levels of support for aggregators, and may expose them
+     * Concept #2: A custom metric can track values in your pipeline as it runs. Each
+     * runner provides varying levels of support for metrics, and may expose them
      * in a dashboard, etc.
      */
-    private final Aggregator<Long, Long> matchedWords =
-        createAggregator("matchedWords", Sum.ofLongs());
-    private final Aggregator<Long, Long> unmatchedWords =
-        createAggregator("unmatchedWords", Sum.ofLongs());
+    private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
+    private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
@@ -105,14 +103,14 @@ public class DebuggingWordCount {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
         // these log lines will appear only if the log level is set to "DEBUG" or lower.
         LOG.debug("Matched: " + c.element().getKey());
-        matchedWords.addValue(1L);
+        matchedWords.inc();
         c.output(c.element());
       } else {
         // Log at the "TRACE" level each element that is not matched. Different log levels
         // can be used to control the verbosity of logging providing an effective mechanism
         // to filter less important information.
         LOG.trace("Did not match: " + c.element().getKey());
-        unmatchedWords.addValue(1L);
+        unmatchedWords.inc();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 7e21d47..0c786bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -19,19 +19,19 @@ package org.apache.beam.examples;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -86,13 +86,12 @@ public class WordCount {
    * to a ParDo in the pipeline.
    */
   static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
+        emptyLines.inc();
       }
 
       // Split the line into words.

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 8d13b90..39553a5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -24,18 +24,18 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -79,8 +79,7 @@ public class CombinePerKeyExamples {
    * outputs word, play_name.
    */
   static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
-    private final Aggregator<Long, Long> smallerWords =
-        createAggregator("smallerWords", Sum.ofLongs());
+    private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
 
     @ProcessElement
     public void processElement(ProcessContext c){
@@ -92,7 +91,7 @@ public class CombinePerKeyExamples {
       } else {
         // Track how many smaller words we're not including. This information will be
         // visible in the Monitoring UI.
-        smallerWords.addValue(1L);
+        smallerWords.inc();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 9c79fad..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -26,10 +26,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -126,8 +127,7 @@ public class GameStats extends LeaderBoard {
           .apply("ProcessAndFilter", ParDo
               // use the derived mean total score as a side input
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-                private final Aggregator<Long, Long> numSpammerUsers =
-                  createAggregator("SpammerUsers", Sum.ofLongs());
+                private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   Integer score = c.element().getValue();
@@ -135,7 +135,7 @@ public class GameStats extends LeaderBoard {
                   if (score > (gmc * SCORE_WEIGHT)) {
                     LOG.info("user " + c.element().getKey() + " spammer score " + score
                         + " with mean " + gmc);
-                    numSpammerUsers.addValue(1L);
+                    numSpammerUsers.inc();
                     c.output(c.element());
                   }
                 }

http://git-wip-us.apache.org/repos/asf/beam/blob/904b4130/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index b4b023f..0adaabc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -25,12 +25,13 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -125,8 +126,7 @@ public class UserScore {
 
     // Log and count parse errors.
     private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
-    private final Aggregator<Long, Long> numParseErrors =
-        createAggregator("ParseErrors", Sum.ofLongs());
+    private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
@@ -139,7 +139,7 @@ public class UserScore {
         GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
         c.output(gInfo);
       } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
-        numParseErrors.addValue(1L);
+        numParseErrors.inc();
         LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
       }
     }