You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/17 16:55:39 UTC

[1/4] beam git commit: Simplify and improve exception unwrapping in TestFlinkRunner

Repository: beam
Updated Branches:
  refs/heads/master ef6a5008a -> b672cde11


Simplify and improve exception unwrapping in TestFlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76b7991f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76b7991f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76b7991f

Branch: refs/heads/master
Commit: 76b7991f102e8e724686042be8266778d4eef44f
Parents: ef6a500
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Mar 14 18:23:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:11 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/TestFlinkRunner.java     | 39 +++++++-------------
 1 file changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/76b7991f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index ef56b55..8f50105 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -56,36 +56,23 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
   public PipelineResult run(Pipeline pipeline) {
     try {
       return delegate.run(pipeline);
-    } catch (Throwable e) {
+    } catch (Throwable t) {
       // Special case hack to pull out assertion errors from PAssert; instead there should
       // probably be a better story along the lines of UserCodeException.
-      Throwable cause = e;
-      Throwable oldCause = e;
-      PipelineExecutionException executionException = null;
-      do {
-
-        // find UserCodeException and throw PipelineExecutionException
-        if (cause instanceof UserCodeException) {
-          executionException = new PipelineExecutionException(cause.getCause());
-        }
-
-        if (cause.getCause() == null) {
-          break;
-        }
-
-        oldCause = cause;
-        cause = cause.getCause();
-
-      } while (!oldCause.equals(cause));
-      if (cause instanceof AssertionError) {
-        throw (AssertionError) cause;
-      } else {
-        if (executionException != null) {
-          throw executionException;
-        } else {
-          throw e;
+      UserCodeException innermostUserCodeException = null;
+      Throwable current = t;
+      for (; current.getCause() != null; current = current.getCause()) {
+        if (current instanceof UserCodeException) {
+          innermostUserCodeException = ((UserCodeException) current);
         }
       }
+      if (innermostUserCodeException != null) {
+        current = innermostUserCodeException.getCause();
+      }
+      if (current instanceof AssertionError) {
+        throw (AssertionError) current;
+      }
+      throw new PipelineExecutionException(current);
     }
   }
 


[4/4] beam git commit: This closes #2247

Posted by tg...@apache.org.
This closes #2247


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b672cde1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b672cde1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b672cde1

Branch: refs/heads/master
Commit: b672cde111b2f11d185479e1d208e63ef6ff322e
Parents: ef6a500 d48c0f3
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 09:55:25 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:55:25 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/TestFlinkRunner.java     |  39 ++--
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 209 ++++++++++---------
 .../apache/beam/sdk/testing/PAssertTest.java    |  44 ++++
 4 files changed, 174 insertions(+), 120 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: Capture assertion site in PAssert

Posted by tg...@apache.org.
Capture assertion site in PAssert

This makes PAssert failures quite a bit easier to debug.
Example message after this commit:

java.lang.AssertionError: Some message

  at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:384)
  at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:279)
  at org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestamped(SplittableDoFnTest.java:234)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  ...
  Caused by: java.lang.AssertionError:
  Expected: iterable over [<TimestampedValue(KV{z, 0}, 2017-01-10T00:38:28.000Z)>, <TimestampedValue(KV{bb, 0}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{bb, 1}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{ccccc, 0}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 1}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 2}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 3}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 4}, 2017-01-10T00:38:30.000Z)>] in any order
  but: Not matched: <TimestampedValue(KV{a, 0}, 2017-01-10T00:38:28.000Z)>
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
  at org.junit.Assert.assertThat(Assert.java:956)
  at org.junit.Assert.assertThat(Assert.java:923)
  at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1270)
at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1)
  ...

  (as opposed to, basically, just the "Caused by" part)

This reverts commit 47592f66222e0b8a82d4c94d14cfba38044658f4.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d48c0f3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d48c0f3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d48c0f3f

Branch: refs/heads/master
Commit: d48c0f3f12920f4d2bdd5e6a719cd9bc8f20cc5f
Parents: 7b154e6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 15:19:00 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 209 ++++++++++---------
 .../apache/beam/sdk/testing/PAssertTest.java    |  44 ++++
 2 files changed, 160 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d48c0f3f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 2596335..f6a409a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -286,7 +286,7 @@ public class PAssert {
    * with the specified reason.
    */
   public static <T> IterableAssert<T> that(String reason, PCollection<T> actual) {
-    return new PCollectionContentsAssert<>(reason, actual);
+    return new PCollectionContentsAssert<>(actual, PAssertionSite.capture(reason));
   }
 
   /**
@@ -316,7 +316,8 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
 
-    return new PCollectionSingletonIterableAssert<>(reason, actualIterables);
+    return new PCollectionSingletonIterableAssert<>(
+        actualIterables, PAssertionSite.capture(reason));
   }
 
   /**
@@ -333,7 +334,9 @@ public class PAssert {
    * a singleton.
    */
   public static <T> SingletonAssert<T> thatSingleton(String reason, PCollection<T> actual) {
-    return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder(), reason);
+    return new PCollectionViewAssert<>(
+        actual, View.<T>asSingleton(), actual.getCoder(), PAssertionSite.capture(reason)
+    );
   }
 
   /**
@@ -362,7 +365,7 @@ public class PAssert {
         actual,
         View.<K, V>asMultimap(),
         MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())),
-        reason);
+        PAssertionSite.capture(reason));
   }
 
   /**
@@ -388,12 +391,36 @@ public class PAssert {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
     return new PCollectionViewAssert<>(
-        actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
-        reason);
+        actual,
+        View.<K, V>asMap(),
+        MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
+        PAssertionSite.capture(reason));
   }
 
   ////////////////////////////////////////////////////////////
 
+  private static class PAssertionSite implements Serializable {
+    private final String message;
+    private final StackTraceElement[] creationStackTrace;
+
+    static PAssertionSite capture(String message) {
+      return new PAssertionSite(message, new Throwable().getStackTrace());
+    }
+
+    PAssertionSite(String message, StackTraceElement[] creationStackTrace) {
+      this.message = message;
+      this.creationStackTrace = creationStackTrace;
+    }
+
+    public AssertionError wrap(Throwable t) {
+      AssertionError res =
+          new AssertionError(
+              message.isEmpty() ? t.getMessage() : (message + ": " + t.getMessage()), t);
+      res.setStackTrace(creationStackTrace);
+      return res;
+    }
+  }
+
   /**
    * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
    * the runner to support side inputs.
@@ -402,21 +429,21 @@ public class PAssert {
     private final PCollection<T> actual;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor;
-    private final String reason;
+    private final PAssertionSite site;
 
-    public PCollectionContentsAssert(String reason, PCollection<T> actual) {
-      this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes(), reason);
+    public PCollectionContentsAssert(PCollection<T> actual, PAssertionSite site) {
+      this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes(), site);
     }
 
     public PCollectionContentsAssert(
         PCollection<T> actual,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
-        String reason) {
+        PAssertionSite site) {
       this.actual = actual;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
-      this.reason = reason;
+      this.site = site;
     }
 
     @Override
@@ -451,7 +478,7 @@ public class PAssert {
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
       return new PCollectionContentsAssert<>(
-          actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor, reason);
+          actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor, site);
     }
 
     /**
@@ -472,7 +499,7 @@ public class PAssert {
      */
     @Override
     public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(reason), expectedElements);
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
     }
 
     @Override
@@ -486,7 +513,7 @@ public class PAssert {
         SerializableFunction<Iterable<T>, Void> checkerFn) {
       actual.apply(
           nextAssertionName(),
-          new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor));
+          new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
       return this;
     }
 
@@ -525,25 +552,23 @@ public class PAssert {
       // more flexible bounds.
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
-          (SerializableFunction) new MatcherCheckerFn<>(reason, matcher);
+          (SerializableFunction) new MatcherCheckerFn<>(matcher);
       actual.apply(
           "PAssert$" + (assertCount++),
-          new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor));
+          new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
       return this;
     }
 
     private static class MatcherCheckerFn<T> implements SerializableFunction<T, Void> {
-      private final String reason;
-      private final SerializableMatcher<T> matcher;
+      private SerializableMatcher<T> matcher;
 
-      public MatcherCheckerFn(String reason, SerializableMatcher<T> matcher) {
-        this.reason = reason;
+      public MatcherCheckerFn(SerializableMatcher<T> matcher) {
         this.matcher = matcher;
       }
 
       @Override
       public Void apply(T actual) {
-        assertThat(reason, actual, matcher);
+        assertThat(actual, matcher);
         return null;
       }
     }
@@ -582,11 +607,12 @@ public class PAssert {
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
         paneExtractor;
-    private final String reason;
+    private final PAssertionSite site;
 
-    public PCollectionSingletonIterableAssert(String reason, PCollection<Iterable<T>> actual) {
-      this(actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane(),
-          reason);
+    public PCollectionSingletonIterableAssert(
+        PCollection<Iterable<T>> actual, PAssertionSite site) {
+      this(
+          actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane(), site);
     }
 
     public PCollectionSingletonIterableAssert(
@@ -594,7 +620,7 @@ public class PAssert {
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
             paneExtractor,
-        String reason) {
+        PAssertionSite site) {
       this.actual = actual;
 
       @SuppressWarnings("unchecked")
@@ -603,7 +629,7 @@ public class PAssert {
 
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
-      this.reason = reason;
+      this.site = site;
     }
 
     @Override
@@ -639,7 +665,7 @@ public class PAssert {
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
       return new PCollectionSingletonIterableAssert<>(
-          actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor, reason);
+          actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor, site);
     }
 
     @Override
@@ -655,7 +681,7 @@ public class PAssert {
 
     @Override
     public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(reason), expectedElements);
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
     }
 
     @Override
@@ -663,7 +689,7 @@ public class PAssert {
         SerializableFunction<Iterable<T>, Void> checkerFn) {
       actual.apply(
           "PAssert$" + (assertCount++),
-          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor));
+          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor, site));
       return this;
     }
 
@@ -686,15 +712,16 @@ public class PAssert {
     private final SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>>
         paneExtractor;
     private final Coder<ViewT> coder;
-    private final String reason;
+    private final PAssertionSite site;
 
     protected PCollectionViewAssert(
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         Coder<ViewT> coder,
-        String reason) {
-      this(actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder,
-          reason);
+        PAssertionSite site) {
+      this(
+          actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder, site
+      );
     }
 
     private PCollectionViewAssert(
@@ -703,13 +730,13 @@ public class PAssert {
         AssertionWindows rewindowActuals,
         SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor,
         Coder<ViewT> coder,
-        String reason) {
+        PAssertionSite site) {
       this.actual = actual;
       this.view = view;
       this.rewindowActuals = rewindowActuals;
       this.paneExtractor = paneExtractor;
       this.coder = coder;
-      this.reason = reason;
+      this.site = site;
     }
 
     @Override
@@ -737,17 +764,17 @@ public class PAssert {
               (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window),
           paneExtractor,
           coder,
-          reason);
+          site);
     }
 
     @Override
     public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
-      return satisfies(new AssertIsEqualToRelation<ViewT>(reason), expectedValue);
+      return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
     }
 
     @Override
     public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
-      return satisfies(new AssertNotEqualToRelation<ViewT>(reason), expectedValue);
+      return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
     }
 
     @Override
@@ -760,7 +787,8 @@ public class PAssert {
               new OneSideInputAssert<ViewT>(
                   CreateActual.from(actual, rewindowActuals, paneExtractor, view),
                   rewindowActuals.<Integer>windowDummy(),
-                  checkerFn));
+                  checkerFn,
+                  site));
       return this;
     }
 
@@ -983,14 +1011,17 @@ public class PAssert {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor;
+    private final PAssertionSite site;
 
     private GroupThenAssert(
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) {
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
+        PAssertionSite site) {
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
+      this.site = site;
     }
 
     @Override
@@ -999,7 +1030,7 @@ public class PAssert {
           .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
           .setCoder(IterableCoder.of(input.getCoder()))
-          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
+          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn, site)));
 
       return PDone.in(input.getPipeline());
     }
@@ -1015,15 +1046,18 @@ public class PAssert {
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
         paneExtractor;
+    private final PAssertionSite site;
 
     private GroupThenAssertForSingleton(
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
-            paneExtractor) {
+            paneExtractor,
+        PAssertionSite site) {
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
+      this.site = site;
     }
 
     @Override
@@ -1032,7 +1066,7 @@ public class PAssert {
           .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
           .setCoder(IterableCoder.of(input.getCoder()))
-          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
+          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn, site)));
 
       return PDone.in(input.getPipeline());
     }
@@ -1053,14 +1087,17 @@ public class PAssert {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
     private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken;
     private final SerializableFunction<ActualT, Void> checkerFn;
+    private final PAssertionSite site;
 
     private OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
         PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
-        SerializableFunction<ActualT, Void> checkerFn) {
+        SerializableFunction<ActualT, Void> checkerFn,
+        PAssertionSite site) {
       this.createActual = createActual;
       this.windowToken = windowToken;
       this.checkerFn = checkerFn;
+      this.site = site;
     }
 
     @Override
@@ -1072,7 +1109,7 @@ public class PAssert {
           .apply("WindowToken", windowToken)
           .apply(
               "RunChecks",
-              ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+              ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual, site)));
 
       return PDone.in(input.getPipeline());
     }
@@ -1092,17 +1129,21 @@ public class PAssert {
     private final Aggregator<Integer, Integer> failure =
         createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
     private final PCollectionView<ActualT> actual;
+    private final PAssertionSite site;
 
     private SideInputCheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
+        SerializableFunction<ActualT, Void> checkerFn,
+        PCollectionView<ActualT> actual,
+        PAssertionSite site) {
       this.checkerFn = checkerFn;
       this.actual = actual;
+      this.site = site;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       ActualT actualContents = c.sideInput(actual);
-      doChecks(actualContents, checkerFn, success, failure);
+      doChecks(site, actualContents, checkerFn, success, failure);
     }
   }
 
@@ -1119,14 +1160,17 @@ public class PAssert {
         createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
         createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
+    private final PAssertionSite site;
 
-    private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+    private GroupedValuesCheckerDoFn(
+        SerializableFunction<ActualT, Void> checkerFn, PAssertionSite site) {
       this.checkerFn = checkerFn;
+      this.site = site;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      doChecks(c.element(), checkerFn, success, failure);
+      doChecks(site, c.element(), checkerFn, success, failure);
     }
   }
 
@@ -1144,19 +1188,23 @@ public class PAssert {
         createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
         createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
+    private final PAssertionSite site;
 
-    private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+    private SingletonCheckerDoFn(
+        SerializableFunction<ActualT, Void> checkerFn, PAssertionSite site) {
       this.checkerFn = checkerFn;
+      this.site = site;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       ActualT actualContents = Iterables.getOnlyElement(c.element());
-      doChecks(actualContents, checkerFn, success, failure);
+      doChecks(site, actualContents, checkerFn, success, failure);
     }
   }
 
   private static <ActualT> void doChecks(
+      PAssertionSite site,
       ActualT actualContents,
       SerializableFunction<ActualT, Void> checkerFn,
       Aggregator<Integer, Integer> successAggregator,
@@ -1165,9 +1213,8 @@ public class PAssert {
       checkerFn.apply(actualContents);
       successAggregator.addValue(1);
     } catch (Throwable t) {
-      LOG.error("PAssert failed expectations.", t);
       failureAggregator.addValue(1);
-      throw t;
+      throw site.wrap(t);
     }
   }
 
@@ -1178,17 +1225,15 @@ public class PAssert {
    * value.
    */
   private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
-    private final String reason;
-    private final T expected;
+    private T expected;
 
-    public AssertIsEqualTo(String reason, T expected) {
-      this.reason = reason;
+    public AssertIsEqualTo(T expected) {
       this.expected = expected;
     }
 
     @Override
     public Void apply(T actual) {
-      assertThat(reason, actual, equalTo(expected));
+      assertThat(actual, equalTo(expected));
       return null;
     }
   }
@@ -1198,17 +1243,15 @@ public class PAssert {
    * value.
    */
   private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
-    private String reason;
     private T expected;
 
-    public AssertNotEqualTo(String reason, T expected) {
-      this.reason = reason;
+    public AssertNotEqualTo(T expected) {
       this.expected = expected;
     }
 
     @Override
     public Void apply(T actual) {
-      assertThat(reason, actual, not(equalTo(expected)));
+      assertThat(actual, not(equalTo(expected)));
       return null;
     }
   }
@@ -1219,27 +1262,25 @@ public class PAssert {
    */
   private static class AssertContainsInAnyOrder<T>
       implements SerializableFunction<Iterable<T>, Void> {
-    private final String reason;
-    private final T[] expected;
+    private T[] expected;
 
     @SafeVarargs
-    public AssertContainsInAnyOrder(String reason, T... expected) {
-      this.reason = reason;
+    public AssertContainsInAnyOrder(T... expected) {
       this.expected = expected;
     }
 
     @SuppressWarnings("unchecked")
-    public AssertContainsInAnyOrder(String reason, Collection<T> expected) {
-      this(reason, (T[]) expected.toArray());
+    public AssertContainsInAnyOrder(Collection<T> expected) {
+      this((T[]) expected.toArray());
     }
 
-    public AssertContainsInAnyOrder(String reason, Iterable<T> expected) {
-      this(reason, Lists.<T>newArrayList(expected));
+    public AssertContainsInAnyOrder(Iterable<T> expected) {
+      this(Lists.<T>newArrayList(expected));
     }
 
     @Override
     public Void apply(Iterable<T> actual) {
-      assertThat(reason, actual, containsInAnyOrder(expected));
+      assertThat(actual, containsInAnyOrder(expected));
       return null;
     }
   }
@@ -1259,15 +1300,9 @@ public class PAssert {
    * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
    */
   private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
-    private final String reason;
-
-    public AssertIsEqualToRelation(String reason) {
-      this.reason = reason;
-    }
-
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertIsEqualTo<T>(reason, expected);
+      return new AssertIsEqualTo<T>(expected);
     }
   }
 
@@ -1275,15 +1310,9 @@ public class PAssert {
    * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
    */
   private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
-    private final String reason;
-
-    public AssertNotEqualToRelation(String reason) {
-      this.reason = reason;
-    }
-
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertNotEqualTo<T>(reason, expected);
+      return new AssertNotEqualTo<T>(expected);
     }
   }
 
@@ -1293,15 +1322,9 @@ public class PAssert {
    */
   private static class AssertContainsInAnyOrderRelation<T>
       implements AssertRelation<Iterable<T>, Iterable<T>> {
-    private final String reason;
-
-    public AssertContainsInAnyOrderRelation(String reason) {
-      this.reason = reason;
-    }
-
     @Override
     public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> expectedElements) {
-      return new AssertContainsInAnyOrder<T>(reason, expectedElements);
+      return new AssertContainsInAnyOrder<T>(expectedElements);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d48c0f3f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 1603db5..dab457a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
@@ -450,6 +451,49 @@ public class PAssertTest implements Serializable {
     assertThat(message, containsString("Expected: iterable over [] in any order"));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testAssertionSiteIsCapturedWithMessage() throws Exception {
+    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    assertThatCollectionIsEmptyWithMessage(vals);
+
+    Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+    assertThat(
+        thrown.getMessage(),
+        containsString("Should be empty"));
+    assertThat(
+        thrown.getMessage(),
+        containsString("Expected: iterable over [] in any order"));
+    String stacktrace = Throwables.getStackTraceAsString(thrown);
+    assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage"));
+    assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage"));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
+    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    assertThatCollectionIsEmptyWithoutMessage(vals);
+
+    Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+    assertThat(
+        thrown.getMessage(),
+        containsString("Expected: iterable over [] in any order"));
+    String stacktrace = Throwables.getStackTraceAsString(thrown);
+    assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage"));
+    assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage"));
+  }
+
+  private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> vals) {
+    PAssert.that("Should be empty", vals).empty();
+  }
+
+  private static void assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) {
+    PAssert.that(vals).empty();
+  }
+
   private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
     // We cannot use thrown.expect(AssertionError.class) because the AssertionError
     // is first caught by JUnit and causes a test failure.


[2/4] beam git commit: Bump dataflow.container_version to 0314

Posted by tg...@apache.org.
Bump dataflow.container_version to 0314


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b154e6c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b154e6c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b154e6c

Branch: refs/heads/master
Commit: 7b154e6cac58f6e95e2d0e16bcb5ffe09eb63611
Parents: 76b7991
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Mar 14 14:31:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:20 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b154e6c/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index ff79681..255c326 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170307</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170314</dataflow.container_version>
     <dataflow.environment_major_version>6</dataflow.environment_major_version>
   </properties>