You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/12 18:51:30 UTC

[1/2] incubator-beam git commit: Closes #315

Repository: incubator-beam
Updated Branches:
  refs/heads/master d2ad0ed31 -> 9f105ec17


Closes #315


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f105ec1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f105ec1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f105ec1

Branch: refs/heads/master
Commit: 9f105ec1760954fc4cd881015d74088796028875
Parents: d2ad0ed 58fa155
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 12 11:51:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 12 11:51:21 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 56 ++++++++++++++++-
 .../DataflowPipelineTranslatorTest.java         | 64 +++++++++++++++++++-
 .../sdk/transforms/display/DisplayData.java     |  4 ++
 .../org/apache/beam/sdk/io/DatastoreIOTest.java |  4 +-
 .../sdk/transforms/display/DisplayDataTest.java | 45 +++++++++++++-
 5 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-117] Runners should be resilient to DisplayData failure

Posted by dh...@apache.org.
[BEAM-117] Runners should be resilient to DisplayData failure

Display data is collected from PTransforms at Pipeline construction
time. Collecting display data runs user code from provided transforms
and fn's. These components should be designed not to throw during
pipeline construction, however we also shouldn't fail a pipeline
if this code does fail.

This PR adds resiliency to the DataflowPipelineTranslator, where
we collect display data for the Dataflow runner, and also a
RunnableOnService test to verify that all runners are resilient to
display data failures. Other runners are not yet using display data,
but will get this validation for free when they do.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58fa1556
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58fa1556
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58fa1556

Branch: refs/heads/master
Commit: 58fa15565b0feb35f2570ae9523a2a26b859eb56
Parents: d2ad0ed
Author: Scott Wegner <sw...@google.com>
Authored: Tue May 10 11:19:14 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 12 11:51:21 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 56 ++++++++++++++++-
 .../DataflowPipelineTranslatorTest.java         | 64 +++++++++++++++++++-
 .../sdk/transforms/display/DisplayData.java     |  4 ++
 .../org/apache/beam/sdk/io/DatastoreIOTest.java |  4 +-
 .../sdk/transforms/display/DisplayDataTest.java | 45 +++++++++++++-
 5 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 05879d9..f58ceff 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -29,6 +29,7 @@ import static org.apache.beam.sdk.util.Structs.addString;
 import static org.apache.beam.sdk.util.Structs.getString;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator;
@@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -89,6 +91,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -547,7 +551,7 @@ public class DataflowPipelineTranslator {
       currentStep.setKind(type);
       steps.add(currentStep);
       addInput(PropertyNames.USER_NAME, getFullName(transform));
-      addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
+      addDisplayData(stepName, transform);
     }
 
     @Override
@@ -725,9 +729,21 @@ public class DataflowPipelineTranslator {
       outputInfoList.add(outputInfo);
     }
 
-    private void addDisplayData(String name, DisplayData displayData) {
+    private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
+      DisplayData displayData;
+      try {
+        displayData = DisplayData.from(hasDisplayData);
+      } catch (Exception e) {
+        String msg = String.format("Exception thrown while collecting display data for step: %s. "
+            + "Display data will be not be available for this step.", stepName);
+        DisplayDataException displayDataException = new DisplayDataException(msg, e);
+        LOG.warn(msg, displayDataException);
+
+        displayData = displayDataException.asDisplayData();
+      }
+
       List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
-      addList(getProperties(), name, list);
+      addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
     }
 
     @Override
@@ -1053,4 +1069,38 @@ public class DataflowPipelineTranslator {
       context.addOutput(tag.getId(), output);
     }
   }
+
+  /**
+   * Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner.
+   */
+  static class DisplayDataException extends Exception implements HasDisplayData {
+    public DisplayDataException(String message, Throwable cause) {
+      super(checkNotNull(message), checkNotNull(cause));
+    }
+
+    /**
+     * Retrieve a display data representation of the exception, which can be submitted to
+     * the service in place of the actual display data.
+     */
+    public DisplayData asDisplayData() {
+      return DisplayData.from(this);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Throwable cause = getCause();
+      builder
+        .add(DisplayData.item("exceptionMessage", getMessage()))
+        .add(DisplayData.item("exceptionType", cause.getClass()))
+        .add(DisplayData.item("exceptionCause", cause.getMessage()))
+        .add(DisplayData.item("stackTrace", stackTraceToString()));
+    }
+
+    private String stackTraceToString() {
+      StringWriter stringWriter = new StringWriter();
+      PrintWriter printWriter = new PrintWriter(stringWriter);
+      printStackTrace(printWriter);
+      return stringWriter.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8e7ed96..ed7e67d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -21,8 +21,11 @@ import static org.apache.beam.sdk.util.Structs.addObject;
 import static org.apache.beam.sdk.util.Structs.getDictionary;
 import static org.apache.beam.sdk.util.Structs.getString;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -47,6 +50,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.RecordingPipelineVisitor;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -104,6 +108,7 @@ import java.util.Map;
 public class DataflowPipelineTranslatorTest implements Serializable {
 
   @Rule public transient ExpectedException thrown = ExpectedException.none();
+  @Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class);
 
   // A Custom Mockito matcher for an initial Job that checks that all
   // expected fields are set.
@@ -720,7 +725,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
 
     // Check that translation does fail.
-    thrown.expectCause(Matchers.allOf(
+    thrown.expectCause(allOf(
         instanceOf(IllegalArgumentException.class),
         ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
     t.translate(
@@ -966,4 +971,61 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
     assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
   }
+
+  @Test
+  public void testCapturesDisplayDataExceptions() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    final RuntimeException displayDataException = new RuntimeException("foobar");
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(new DoFn<Integer, Integer>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element());
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            throw displayDataException;
+          }
+        }));
+
+    Job job = translator.translate(
+        pipeline,
+        (DataflowPipelineRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList()).getJob();
+
+    String expectedMessage = "Display data will be not be available for this step";
+    logs.verifyWarn(expectedMessage);
+
+    List<Step> steps = job.getSteps();
+    assertEquals("Job should have 2 steps", 2, steps.size());
+
+    @SuppressWarnings("unchecked")
+    Iterable<Map<String, String>> displayData = (Collection<Map<String, String>>) steps.get(1)
+        .getProperties().get("display_data");
+
+    String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName();
+    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
+      hasEntry("namespace", namespace),
+      hasEntry("key", "exceptionType"),
+      hasEntry("value", RuntimeException.class.getName()))));
+
+    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
+        hasEntry("namespace", namespace),
+        hasEntry("key", "exceptionMessage"),
+        hasEntry(is("value"), Matchers.containsString(expectedMessage)))));
+
+    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
+        hasEntry("namespace", namespace),
+        hasEntry("key", "exceptionCause"),
+        hasEntry("value", "foobar"))));
+
+    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
+        hasEntry("namespace", namespace),
+        hasEntry("key", "stackTrace"))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index fa8c0e9..dc6e381 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -72,6 +72,10 @@ public class DisplayData implements Serializable {
    * Collect the {@link DisplayData} from a component. This will traverse all subcomponents
    * specified via {@link Builder#include} in the given component. Data in this component will be in
    * a namespace derived from the component.
+   *
+   * <p>Pipeline runners should call this method in order to collect display data. While it should
+   * be safe to call {@code DisplayData.from} on any component which implements it, runners should
+   * be resilient to exceptions thrown while collecting display data.
    */
   public static DisplayData from(HasDisplayData component) {
     checkNotNull(component, "component argument cannot be null");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
index 85920aa..622abb2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
@@ -194,7 +194,7 @@ public class DatastoreIOTest {
   }
 
   @Test
-  public void testSourceDipslayData() {
+  public void testSourceDisplayData() {
   DatastoreIO.Source source = DatastoreIO.source()
       .withDataset(DATASET)
       .withQuery(QUERY)
@@ -242,7 +242,7 @@ public class DatastoreIOTest {
   }
 
   @Test
-  public void testSinkDipslayData() {
+  public void testSinkDisplayData() {
     DatastoreIO.Sink sink = DatastoreIO.sink()
         .withDataset(DATASET)
         .withHost(HOST);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 851769a..21b2e33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -39,7 +39,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayData.Item;
 import org.apache.beam.sdk.values.PCollection;
@@ -62,11 +69,13 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -75,8 +84,8 @@ import java.util.regex.Pattern;
  * Tests for {@link DisplayData} class.
  */
 @RunWith(JUnit4.class)
-public class DisplayDataTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
+public class DisplayDataTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
   private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime();
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
@@ -958,6 +967,38 @@ public class DisplayDataTest {
         quoted("DisplayDataTest"), "baz", "http://abc"));
   }
 
+  /**
+   * Validate that all runners are resilient to exceptions thrown while retrieving display data.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testRunnersResilientToDisplayDataExceptions() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> pCol = p
+        .apply(Create.of(1, 2, 3))
+        .apply(new IdentityTransform<Integer>() {
+          @Override
+          public void populateDisplayData(Builder builder) {
+            throw new RuntimeException("bug!");
+          }
+        });
+
+    PAssert.that(pCol).containsInAnyOrder(1, 2, 3);
+    p.run();
+  }
+
+  private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      return input.apply(ParDo.of(new DoFn<T, T>() {
+        @Override
+        public void processElement(ProcessContext c) throws Exception {
+          c.output(c.element());
+        }
+      }));
+    }
+  }
+
   private String quoted(Object obj) {
     return String.format("\"%s\"", obj);
   }