You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/18 16:29:01 UTC
[1/2] incubator-beam git commit: [BEAM-462] Replace
MonitoringUtil.PrintHandler with a handler that utilizes a Java logger
Repository: incubator-beam
Updated Branches:
refs/heads/master a0afd6665 -> bfbfabf6d
[BEAM-462] Replace MonitoringUtil.PrintHandler with a handler that utilizes a Java logger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e4d9776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e4d9776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e4d9776
Branch: refs/heads/master
Commit: 7e4d977660906363b9c5ab55eababdd7144d6b91
Parents: a0afd66
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jul 18 08:51:32 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 09:28:20 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 2 +-
.../dataflow/BlockingDataflowRunner.java | 5 +-
.../BlockingDataflowPipelineOptions.java | 27 --------
.../dataflow/testing/TestDataflowRunner.java | 2 +-
.../runners/dataflow/util/MonitoringUtil.java | 70 +++++++++-----------
.../dataflow/util/MonitoringUtilTest.java | 60 +++++++++++++++++
6 files changed, 95 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index ad00a14..8f9be31 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -306,7 +306,7 @@ public class ExampleUtils {
addShutdownHook(jobsToCancel);
}
try {
- job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+ job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
} catch (Exception e) {
throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
index 5c59bc2..f7f7dc8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
@@ -41,7 +41,7 @@ import javax.annotation.Nullable;
* A {@link PipelineRunner} that's like {@link DataflowRunner}
* but that waits for the launched job to finish.
*
- * <p>Prints out job status updates and console messages while it waits.
+ * <p>Logs job status updates and console messages while it waits.
*
* <p>Returns the final job state, or throws an exception if the job
* fails or cannot be monitored.
@@ -117,8 +117,7 @@ public class BlockingDataflowRunner extends
State result;
try {
result = job.waitToFinish(
- BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+ BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
} catch (IOException | InterruptedException ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
index 809df35..5d8d1a1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
@@ -18,38 +18,11 @@
package org.apache.beam.runners.dataflow.options;
import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
/**
* Options that are used to configure the {@link BlockingDataflowRunner}.
*/
@Description("Configure options on the BlockingDataflowRunner.")
public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
- /**
- * Output stream for job status messages.
- */
- @Description("Where messages generated during execution of the Dataflow job will be output.")
- @JsonIgnore
- @Hidden
- @Default.InstanceFactory(StandardOutputFactory.class)
- PrintStream getJobMessageOutput();
- void setJobMessageOutput(PrintStream value);
-
- /**
- * Returns a default of {@link System#out}.
- */
- public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
- @Override
- public PrintStream create(PipelineOptions options) {
- return System.out;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 6894a10..b60e1be 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -107,7 +107,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
assertThat(job, testPipelineOptions.getOnCreateMatcher());
CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
- job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+ job, new MonitoringUtil.LoggingHandler());
try {
final Optional<Boolean> result;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index 67cdfa6..4d12e66 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -27,12 +27,14 @@ import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -63,6 +65,11 @@ public final class MonitoringUtil {
.put("JOB_STATE_CANCELLED", State.CANCELLED)
.put("JOB_STATE_UPDATED", State.UPDATED)
.build();
+ private static final String JOB_MESSAGE_ERROR = "JOB_MESSAGE_ERROR";
+ private static final String JOB_MESSAGE_WARNING = "JOB_MESSAGE_WARNING";
+ private static final String JOB_MESSAGE_BASIC = "JOB_MESSAGE_BASIC";
+ private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
+ private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
private String projectId;
private Messages messagesClient;
@@ -76,53 +83,38 @@ public final class MonitoringUtil {
void process(List<JobMessage> messages);
}
- /** A handler that prints monitoring messages to a stream. */
- public static class PrintHandler implements JobMessagesHandler {
- private PrintStream out;
-
- /**
- * Construct the handler.
- *
- * @param stream The stream to write the messages to.
- */
- public PrintHandler(PrintStream stream) {
- out = stream;
- }
+ /** A handler that logs monitoring messages. */
+ public static class LoggingHandler implements JobMessagesHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class);
@Override
public void process(List<JobMessage> messages) {
for (JobMessage message : messages) {
- if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
- continue;
- }
- String importanceString = null;
- if (message.getMessageImportance() == null) {
- continue;
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- importanceString = "Error: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
- importanceString = "Warning: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
- importanceString = "Basic: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
- importanceString = "Detail: ";
- } else {
- // TODO: Remove filtering here once getJobMessages supports minimum
- // importance.
+ if (Strings.isNullOrEmpty(message.getMessageText())) {
continue;
}
+
@Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
- if (time == null) {
- out.print("UNKNOWN TIMESTAMP: ");
- } else {
- out.print(time + ": ");
- }
- if (importanceString != null) {
- out.print(importanceString);
+ String logMessage = (time == null ? "UNKNOWN TIMESTAMP: " : time + ": ")
+ + message.getMessageText();
+ switch (message.getMessageImportance()) {
+ case JOB_MESSAGE_ERROR:
+ LOG.error(logMessage);
+ break;
+ case JOB_MESSAGE_WARNING:
+ LOG.warn(logMessage);
+ break;
+ case JOB_MESSAGE_BASIC:
+ case JOB_MESSAGE_DETAILED:
+ LOG.info(logMessage);
+ break;
+ case JOB_MESSAGE_DEBUG:
+ LOG.debug(logMessage);
+ break;
+ default:
+ LOG.trace(logMessage);
}
- out.println(message.getMessageText());
}
- out.flush();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 4b0ab2f..da07515 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -23,15 +23,19 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.TestCredential;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import org.joda.time.DateTime;
import org.joda.time.Instant;
+import org.joda.time.chrono.ISOChronology;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -40,6 +44,7 @@ import org.junit.runners.JUnit4;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -50,6 +55,7 @@ public class MonitoringUtilTest {
private static final String PROJECT_ID = "someProject";
private static final String JOB_ID = "1234";
+ @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LoggingHandler.class);
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
@@ -147,4 +153,58 @@ public class MonitoringUtilTest {
+ "gcloud alpha dataflow jobs --project=someProject cancel 1234",
cancelCommand);
}
+
+ @Test
+ public void testLoggingHandler() {
+ DateTime errorTime = new DateTime(1000L, ISOChronology.getInstanceUTC());
+ DateTime warningTime = new DateTime(2000L, ISOChronology.getInstanceUTC());
+ DateTime basicTime = new DateTime(3000L, ISOChronology.getInstanceUTC());
+ DateTime detailedTime = new DateTime(4000L, ISOChronology.getInstanceUTC());
+ DateTime debugTime = new DateTime(5000L, ISOChronology.getInstanceUTC());
+ DateTime unknownTime = new DateTime(6000L, ISOChronology.getInstanceUTC());
+ JobMessage errorJobMessage = new JobMessage();
+ errorJobMessage.setMessageImportance("JOB_MESSAGE_ERROR");
+ errorJobMessage.setMessageText("ERRORERROR");
+ errorJobMessage.setTime(TimeUtil.toCloudTime(errorTime));
+ JobMessage warningJobMessage = new JobMessage();
+ warningJobMessage.setMessageImportance("JOB_MESSAGE_WARNING");
+ warningJobMessage.setMessageText("WARNINGWARNING");
+ warningJobMessage.setTime(TimeUtil.toCloudTime(warningTime));
+ JobMessage basicJobMessage = new JobMessage();
+ basicJobMessage.setMessageImportance("JOB_MESSAGE_BASIC");
+ basicJobMessage.setMessageText("BASICBASIC");
+ basicJobMessage.setTime(TimeUtil.toCloudTime(basicTime));
+ JobMessage detailedJobMessage = new JobMessage();
+ detailedJobMessage.setMessageImportance("JOB_MESSAGE_DETAILED");
+ detailedJobMessage.setMessageText("DETAILEDDETAILED");
+ detailedJobMessage.setTime(TimeUtil.toCloudTime(detailedTime));
+ JobMessage debugJobMessage = new JobMessage();
+ debugJobMessage.setMessageImportance("JOB_MESSAGE_DEBUG");
+ debugJobMessage.setMessageText("DEBUGDEBUG");
+ debugJobMessage.setTime(TimeUtil.toCloudTime(debugTime));
+ JobMessage unknownJobMessage = new JobMessage();
+ unknownJobMessage.setMessageImportance("JOB_MESSAGE_UNKNOWN");
+ unknownJobMessage.setMessageText("UNKNOWNUNKNOWN");
+ unknownJobMessage.setTime("");
+ JobMessage emptyJobMessage = new JobMessage();
+ emptyJobMessage.setMessageImportance("JOB_MESSAGE_EMPTY");
+ emptyJobMessage.setTime(TimeUtil.toCloudTime(unknownTime));
+
+ new LoggingHandler().process(Arrays.asList(errorJobMessage, warningJobMessage, basicJobMessage,
+ detailedJobMessage, debugJobMessage, unknownJobMessage));
+
+ expectedLogs.verifyError("ERRORERROR");
+ expectedLogs.verifyError(errorTime.toString());
+ expectedLogs.verifyWarn("WARNINGWARNING");
+ expectedLogs.verifyWarn(warningTime.toString());
+ expectedLogs.verifyInfo("BASICBASIC");
+ expectedLogs.verifyInfo(basicTime.toString());
+ expectedLogs.verifyInfo("DETAILEDDETAILED");
+ expectedLogs.verifyInfo(detailedTime.toString());
+ expectedLogs.verifyDebug("DEBUGDEBUG");
+ expectedLogs.verifyDebug(debugTime.toString());
+ expectedLogs.verifyTrace("UNKNOWN TIMESTAMP");
+ expectedLogs.verifyTrace("UNKNOWNUNKNOWN");
+ expectedLogs.verifyNotLogged(unknownTime.toString());
+ }
}
[2/2] incubator-beam git commit: [BEAM-462] Replace
MonitoringUtil.PrintHandler with a handler that utilizes a Java logger
Posted by lc...@apache.org.
[BEAM-462] Replace MonitoringUtil.PrintHandler with a handler that utilizes a Java logger
This closes #673
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfbfabf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfbfabf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfbfabf6
Branch: refs/heads/master
Commit: bfbfabf6dd26d61dcd88304813918144f33ed3e0
Parents: a0afd66 7e4d977
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jul 18 09:28:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 18 09:28:49 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 2 +-
.../dataflow/BlockingDataflowRunner.java | 5 +-
.../BlockingDataflowPipelineOptions.java | 27 --------
.../dataflow/testing/TestDataflowRunner.java | 2 +-
.../runners/dataflow/util/MonitoringUtil.java | 70 +++++++++-----------
.../dataflow/util/MonitoringUtilTest.java | 60 +++++++++++++++++
6 files changed, 95 insertions(+), 71 deletions(-)
----------------------------------------------------------------------