You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 21:59:00 UTC

[3/4] incubator-beam git commit: Revise DebuggingWordCount to be more portable

Revise DebuggingWordCount to be more portable


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bc9c3f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bc9c3f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bc9c3f0

Branch: refs/heads/master
Commit: 9bc9c3f0fcab4571f60d4eb872df0904ee0eb99d
Parents: 0b05a8c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:50:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 8 13:51:25 2016 -0800

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       | 67 +++++---------------
 .../beam/examples/DebuggingWordCountTest.java   | 15 ++++-
 2 files changed, 28 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 1d2c83a..f7c537c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory;
  *
  * <p>New Concepts:
  * <pre>
- *   1. Logging to Cloud Logging
- *   2. Controlling worker log levels
- *   3. Creating a custom aggregator
- *   4. Testing your Pipeline via PAssert
+ *   1. Logging using SLF4J, even in a distributed environment
+ *   2. Creating a custom aggregator (runners have varying levels of support)
+ *   3. Testing your Pipeline via PAssert
  * </pre>
  *
  * <p>To execute this pipeline locally, specify general pipeline configuration:
@@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  *
- * <p>To use the additional logging discussed below, specify:
- * <pre>{@code
- *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * }
- * </pre>
- *
- * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
- * the quotations as appropriate for your shell. For example, in <code>bash</code>:
- * <pre>
- * mvn compile exec:java ... \
- *   -Dexec.args="... \
- *     --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}"
- * </pre>
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
  *
- * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
- * Logging by default at "INFO" log level and higher. One may override log levels for specific
- * logging namespaces by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * </code></pre>
- * For example, by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * </code></pre>
- * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
- * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in
- * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
- * logging configuration can be overridden by specifying
- * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
- * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
- * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
- * that changing the default worker log level to TRACE or DEBUG will significantly increase
- * the amount of logs output.
- *
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
  */
 public class DebuggingWordCount {
   /** A DoFn that filters for a specific key based upon a regular expression. */
   public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
     /**
-     * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
-     * as the logger. All log statements emitted by this logger will be referenced by this name
-     * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
-     * about the Cloud Logging UI.
+     * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the
+     * logger. Depending on your SLF4J configuration, log statements will likely be qualified by
+     * this name.
+     *
+     * <p>Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J
+     * configuration that is most appropriate for their logging integration.
      */
     private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
 
@@ -122,11 +90,9 @@ public class DebuggingWordCount {
     }
 
     /**
-     * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
-     * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
-     * Dataflow service. These aggregators below track the number of matched and unmatched words.
-     * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
-     * the Dataflow Monitoring UI.
+     * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
+     * runner provides varying levels of support for aggregators, and may expose them
+     * in a dashboard, etc.
      */
     private final Aggregator<Long, Long> matchedWords =
         createAggregator("matchedWords", new Sum.SumLongFn());
@@ -137,8 +103,7 @@ public class DebuggingWordCount {
     public void processElement(ProcessContext c) {
       if (filter.matcher(c.element().getKey()).matches()) {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
-        // using the Dataflow service, these log lines will appear in the Cloud Logging UI
-        // only if the log level is set to "DEBUG" or lower.
+        // these log lines will appear only if the log level is set to "DEBUG" or lower.
         LOG.debug("Matched: " + c.element().getKey());
         matchedWords.addValue(1L);
         c.output(c.element());
@@ -178,7 +143,7 @@ public class DebuggingWordCount {
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 
     /**
-     * Concept #4: PAssert is a set of convenient PTransforms in the style of
+     * Concept #3: PAssert is a set of convenient PTransforms in the style of
      * Hamcrest's collection matchers that can be used when writing Pipeline level tests
      * to validate the contents of PCollections. PAssert is best used in unit tests
      * with small data sets but is demonstrated here as a teaching tool.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
index c1bd5d4..054277a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.examples;
 import com.google.common.io.Files;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import org.apache.beam.examples.DebuggingWordCount.WordCountOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -35,9 +37,16 @@ public class DebuggingWordCountTest {
 
   @Test
   public void testDebuggingWordCount() throws Exception {
-    File file = tmpFolder.newFile();
-    Files.write("stomach secret Flourish message Flourish here Flourish", file,
+    File inputFile = tmpFolder.newFile();
+    File outputFile = tmpFolder.newFile();
+    Files.write(
+        "stomach secret Flourish message Flourish here Flourish",
+        inputFile,
         StandardCharsets.UTF_8);
-    DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()});
+    WordCountOptions options =
+        TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
+    options.setInputFile(inputFile.getAbsolutePath());
+    options.setOutput(outputFile.getAbsolutePath());
+    DebuggingWordCount.main(TestPipeline.convertToArgs(options));
   }
 }