You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/05/24 22:52:56 UTC

[1/2] incubator-beam git commit: Evaluate display data from InProcessPipelineRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5535fc3fd -> ffed1d479


Evaluate display data from InProcessPipelineRunner

Display data can be added to any PTransform to be used
for display from any runner. Runners are not required to
consume display data, and currently many don't.

This changes InProcessRunner to consumer display data (and then
discard it) in order to validate that display data is properly
implemented on transforms within a pipeline. Exceptions thrown
within HasDisplayData implementations will cause Pipeline.run()
to fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac98c4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac98c4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac98c4a0

Branch: refs/heads/master
Commit: ac98c4a01f7e4b2bba3e99271b66e141e7555de8
Parents: 5535fc3
Author: Scott Wegner <sw...@google.com>
Authored: Mon May 23 14:29:33 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue May 24 14:11:44 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DisplayDataValidator.java    | 67 ++++++++++++++++++++
 .../runners/direct/InProcessPipelineRunner.java |  2 +
 .../direct/InProcessPipelineRunnerTest.java     | 63 ++++++++++++++++++
 .../sdk/options/ProxyInvocationHandler.java     |  7 +-
 .../sdk/transforms/display/DisplayData.java     | 14 +++-
 .../sdk/transforms/display/DisplayDataTest.java | 19 ++++++
 6 files changed, 168 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
new file mode 100644
index 0000000..e09fe62
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
+/**
+ * Validate correct implementation of {@link DisplayData} by evaluating
+ * {@link HasDisplayData#populateDisplayData(DisplayData.Builder)} during pipeline construction.
+ */
+class DisplayDataValidator {
+  // Do not instantiate
+  private DisplayDataValidator() {}
+
+  static void validatePipeline(Pipeline pipeline) {
+    validateOptions(pipeline);
+    validateTransforms(pipeline);
+  }
+
+  private static void validateOptions(Pipeline pipeline) {
+    evaluateDisplayData(pipeline.getOptions());
+  }
+
+  private static void validateTransforms(Pipeline pipeline) {
+    pipeline.traverseTopologically(Visitor.INSTANCE);
+  }
+
+  private static void evaluateDisplayData(HasDisplayData component) {
+    DisplayData.from(component);
+  }
+
+  private static class Visitor extends Pipeline.PipelineVisitor.Defaults {
+    private static final Visitor INSTANCE = new Visitor();
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+      if (!node.isRootNode()) {
+        evaluateDisplayData(node.getTransform());
+      }
+
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(TransformTreeNode node) {
+      evaluateDisplayData(node.getTransform());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
index a7f6941..5a04af4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -222,6 +222,8 @@ public class InProcessPipelineRunner
                 GroupByKey.class, InProcessGroupByKeyOnly.class));
     pipeline.traverseTopologically(keyedPValueVisitor);
 
+    DisplayDataValidator.validatePipeline(pipeline);
+
     InProcessEvaluationContext context =
         InProcessEvaluationContext.create(
             getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 87db39a..e403019 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.is;
+
 import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -24,12 +26,19 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -40,6 +49,8 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class InProcessPipelineRunnerTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void wordCountShouldSucceed() throws Throwable {
     Pipeline p = getPipeline();
@@ -68,6 +79,58 @@ public class InProcessPipelineRunnerTest implements Serializable {
     result.awaitCompletion();
   }
 
+  @Test
+  public void transformDisplayDataExceptionShouldFail() {
+    DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {}
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        throw new RuntimeException("oh noes!");
+      }
+    };
+
+    Pipeline p = getPipeline();
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(brokenDoFn));
+
+    thrown.expectMessage(brokenDoFn.getClass().getName());
+    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!")));
+    p.run();
+  }
+
+  @Test
+  public void pipelineOptionsDisplayDataExceptionShouldFail() {
+    Object brokenValueType = new Object() {
+      @JsonValue
+      public int getValue () {
+        return 42;
+      }
+
+      @Override
+      public String toString() {
+        throw new RuntimeException("oh noes!!");
+      }
+    };
+
+    Pipeline p = getPipeline();
+    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
+
+    p.apply(Create.of(1, 2, 3));
+
+    thrown.expectMessage(PipelineOptions.class.getName());
+    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+    p.run();
+  }
+
+  interface ObjectPipelineOptions extends PipelineOptions {
+    Object getValue();
+    void setValue(Object value);
+  }
+
+
   private Pipeline getPipeline() {
     PipelineOptions opts = PipelineOptionsFactory.create();
     opts.setRunner(InProcessPipelineRunner.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 3292a7f..a0c4ea3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -84,7 +84,7 @@ import javax.annotation.concurrent.ThreadSafe;
  * {@link PipelineOptions#as(Class)}.
  */
 @ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler {
+class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
   private static final ObjectMapper MAPPER = new ObjectMapper();
   /**
    * No two instances of this class are considered equivalent hence we generate a random hash code
@@ -141,7 +141,8 @@ class ProxyInvocationHandler implements InvocationHandler {
         && args[0] instanceof DisplayData.Builder) {
       @SuppressWarnings("unchecked")
       DisplayData.Builder builder = (DisplayData.Builder) args[0];
-      populateDisplayData(builder);
+      // Explicitly set display data namespace so thrown exceptions will have sensible type.
+      builder.include(this, PipelineOptions.class);
       return Void.TYPE;
     }
     String methodName = method.getName();
@@ -266,7 +267,7 @@ class ProxyInvocationHandler implements InvocationHandler {
    * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
    * pipeline options will be added as display data.
    */
-  private void populateDisplayData(DisplayData.Builder builder) {
+  public void populateDisplayData(DisplayData.Builder builder) {
     Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
     Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 9e9bdbf..59a2cf4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -602,10 +602,13 @@ public class DisplayData implements Serializable {
 
         try {
           subComponent.populateDisplayData(this);
+        } catch (PopulateDisplayDataException e) {
+          // Don't re-wrap exceptions recursively.
+          throw e;
         } catch (Throwable e) {
           String msg = String.format("Error while populating display data for component: %s",
               namespace);
-          throw new RuntimeException(msg, e);
+          throw new PopulateDisplayDataException(msg, e);
         }
 
         this.latestNs = prevNs;
@@ -614,6 +617,15 @@ public class DisplayData implements Serializable {
       return this;
     }
 
+    /**
+     * Marker exception class for exceptions encountered while populating display data.
+     */
+    private class PopulateDisplayDataException extends RuntimeException {
+      PopulateDisplayDataException(String message, Throwable cause) {
+        super(message, cause);
+      }
+    }
+
     @Override
     public Builder add(Item<?> item) {
       checkNotNull(item, "Input display item cannot be null");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac98c4a0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 478724b..88973ff 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -1026,6 +1026,25 @@ public class DisplayDataTest implements Serializable {
     DisplayData.from(component);
   }
 
+  @Test
+  public void testExceptionsNotWrappedRecursively() {
+    final RuntimeException cause = new RuntimeException("oh noes!");
+    HasDisplayData component = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include(new HasDisplayData() {
+          @Override
+          public void populateDisplayData(Builder builder) {
+            throw cause;
+          }
+        });
+      }
+    };
+
+    thrown.expectCause(is(cause));
+    DisplayData.from(component);
+  }
+
   private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
     public PCollection<T> apply(PCollection<T> input) {


[2/2] incubator-beam git commit: This closes #375

Posted by bc...@apache.org.
This closes #375


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ffed1d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ffed1d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ffed1d47

Branch: refs/heads/master
Commit: ffed1d4791f8260a60d4535d754e7e5cd69157d7
Parents: 5535fc3 ac98c4a
Author: bchambers <bc...@google.com>
Authored: Tue May 24 14:11:48 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue May 24 14:11:48 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DisplayDataValidator.java    | 67 ++++++++++++++++++++
 .../runners/direct/InProcessPipelineRunner.java |  2 +
 .../direct/InProcessPipelineRunnerTest.java     | 63 ++++++++++++++++++
 .../sdk/options/ProxyInvocationHandler.java     |  7 +-
 .../sdk/transforms/display/DisplayData.java     | 14 +++-
 .../sdk/transforms/display/DisplayDataTest.java | 19 ++++++
 6 files changed, 168 insertions(+), 4 deletions(-)
----------------------------------------------------------------------