You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 21:59:00 UTC
[3/4] incubator-beam git commit: Revise DebuggingWordCount to be more
portable
Revise DebuggingWordCount to be more portable
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bc9c3f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bc9c3f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bc9c3f0
Branch: refs/heads/master
Commit: 9bc9c3f0fcab4571f60d4eb872df0904ee0eb99d
Parents: 0b05a8c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:50:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 8 13:51:25 2016 -0800
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 67 +++++---------------
.../beam/examples/DebuggingWordCountTest.java | 15 ++++-
2 files changed, 28 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 1d2c83a..f7c537c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory;
*
* <p>New Concepts:
* <pre>
- * 1. Logging to Cloud Logging
- * 2. Controlling worker log levels
- * 3. Creating a custom aggregator
- * 4. Testing your Pipeline via PAssert
+ * 1. Logging using SLF4J, even in a distributed environment
+ * 2. Creating a custom aggregator (runners have varying levels of support)
+ * 3. Testing your Pipeline via PAssert
* </pre>
*
* <p>To execute this pipeline locally, specify general pipeline configuration:
@@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory;
* }
* </pre>
*
- * <p>To use the additional logging discussed below, specify:
- * <pre>{@code
- * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * }
- * </pre>
- *
- * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
- * the quotations as appropriate for your shell. For example, in <code>bash</code>:
- * <pre>
- * mvn compile exec:java ... \
- * -Dexec.args="... \
- * --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}"
- * </pre>
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*
- * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
- * Logging by default at "INFO" log level and higher. One may override log levels for specific
- * logging namespaces by specifying:
- * <pre><code>
- * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * </code></pre>
- * For example, by specifying:
- * <pre><code>
- * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * </code></pre>
- * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
- * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in
- * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
- * logging configuration can be overridden by specifying
- * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
- * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
- * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
- * that changing the default worker log level to TRACE or DEBUG will significantly increase
- * the amount of logs output.
- *
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
*/
public class DebuggingWordCount {
/** A DoFn that filters for a specific key based upon a regular expression. */
public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
/**
- * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
- * as the logger. All log statements emitted by this logger will be referenced by this name
- * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
- * about the Cloud Logging UI.
+ * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the
+ * logger. Depending on your SLF4J configuration, log statements will likely be qualified by
+ * this name.
+ *
+ * <p>Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J
+ * configuration that is most appropriate for their logging integration.
*/
private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
@@ -122,11 +90,9 @@ public class DebuggingWordCount {
}
/**
- * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
- * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
- * Dataflow service. These aggregators below track the number of matched and unmatched words.
- * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
- * the Dataflow Monitoring UI.
+ * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
+ * runner provides varying levels of support for aggregators, and may expose them
+ * in a dashboard, etc.
*/
private final Aggregator<Long, Long> matchedWords =
createAggregator("matchedWords", new Sum.SumLongFn());
@@ -137,8 +103,7 @@ public class DebuggingWordCount {
public void processElement(ProcessContext c) {
if (filter.matcher(c.element().getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
- // using the Dataflow service, these log lines will appear in the Cloud Logging UI
- // only if the log level is set to "DEBUG" or lower.
+ // these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: " + c.element().getKey());
matchedWords.addValue(1L);
c.output(c.element());
@@ -178,7 +143,7 @@ public class DebuggingWordCount {
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
/**
- * Concept #4: PAssert is a set of convenient PTransforms in the style of
+ * Concept #3: PAssert is a set of convenient PTransforms in the style of
* Hamcrest's collection matchers that can be used when writing Pipeline level tests
* to validate the contents of PCollections. PAssert is best used in unit tests
* with small data sets but is demonstrated here as a teaching tool.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
index c1bd5d4..054277a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.examples;
import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.StandardCharsets;
+import org.apache.beam.examples.DebuggingWordCount.WordCountOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -35,9 +37,16 @@ public class DebuggingWordCountTest {
@Test
public void testDebuggingWordCount() throws Exception {
- File file = tmpFolder.newFile();
- Files.write("stomach secret Flourish message Flourish here Flourish", file,
+ File inputFile = tmpFolder.newFile();
+ File outputFile = tmpFolder.newFile();
+ Files.write(
+ "stomach secret Flourish message Flourish here Flourish",
+ inputFile,
StandardCharsets.UTF_8);
- DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()});
+ WordCountOptions options =
+ TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
+ options.setInputFile(inputFile.getAbsolutePath());
+ options.setOutput(outputFile.getAbsolutePath());
+ DebuggingWordCount.main(TestPipeline.convertToArgs(options));
}
}