You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/17 16:55:39 UTC
[1/4] beam git commit: Simplify and improve exception unwrapping in
TestFlinkRunner
Repository: beam
Updated Branches:
refs/heads/master ef6a5008a -> b672cde11
Simplify and improve exception unwrapping in TestFlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76b7991f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76b7991f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76b7991f
Branch: refs/heads/master
Commit: 76b7991f102e8e724686042be8266778d4eef44f
Parents: ef6a500
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Mar 14 18:23:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:11 2017 -0700
----------------------------------------------------------------------
.../beam/runners/flink/TestFlinkRunner.java | 39 +++++++-------------
1 file changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/76b7991f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index ef56b55..8f50105 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -56,36 +56,23 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
public PipelineResult run(Pipeline pipeline) {
try {
return delegate.run(pipeline);
- } catch (Throwable e) {
+ } catch (Throwable t) {
// Special case hack to pull out assertion errors from PAssert; instead there should
// probably be a better story along the lines of UserCodeException.
- Throwable cause = e;
- Throwable oldCause = e;
- PipelineExecutionException executionException = null;
- do {
-
- // find UserCodeException and throw PipelineExecutionException
- if (cause instanceof UserCodeException) {
- executionException = new PipelineExecutionException(cause.getCause());
- }
-
- if (cause.getCause() == null) {
- break;
- }
-
- oldCause = cause;
- cause = cause.getCause();
-
- } while (!oldCause.equals(cause));
- if (cause instanceof AssertionError) {
- throw (AssertionError) cause;
- } else {
- if (executionException != null) {
- throw executionException;
- } else {
- throw e;
+ UserCodeException innermostUserCodeException = null;
+ Throwable current = t;
+ for (; current.getCause() != null; current = current.getCause()) {
+ if (current instanceof UserCodeException) {
+ innermostUserCodeException = ((UserCodeException) current);
}
}
+ if (innermostUserCodeException != null) {
+ current = innermostUserCodeException.getCause();
+ }
+ if (current instanceof AssertionError) {
+ throw (AssertionError) current;
+ }
+ throw new PipelineExecutionException(current);
}
}
[4/4] beam git commit: This closes #2247
Posted by tg...@apache.org.
This closes #2247
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b672cde1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b672cde1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b672cde1
Branch: refs/heads/master
Commit: b672cde111b2f11d185479e1d208e63ef6ff322e
Parents: ef6a500 d48c0f3
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 09:55:25 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:55:25 2017 -0700
----------------------------------------------------------------------
.../beam/runners/flink/TestFlinkRunner.java | 39 ++--
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../org/apache/beam/sdk/testing/PAssert.java | 209 ++++++++++---------
.../apache/beam/sdk/testing/PAssertTest.java | 44 ++++
4 files changed, 174 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
[3/4] beam git commit: Capture assertion site in PAssert
Posted by tg...@apache.org.
Capture assertion site in PAssert
This makes PAssert failures quite a bit easier to debug.
Example message after this commit:
java.lang.AssertionError: Some message
at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:384)
at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:279)
at org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestamped(SplittableDoFnTest.java:234)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.AssertionError:
Expected: iterable over [<TimestampedValue(KV{z, 0}, 2017-01-10T00:38:28.000Z)>, <TimestampedValue(KV{bb, 0}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{bb, 1}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{ccccc, 0}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 1}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 2}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 3}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 4}, 2017-01-10T00:38:30.000Z)>] in any order
but: Not matched: <TimestampedValue(KV{a, 0}, 2017-01-10T00:38:28.000Z)>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.Assert.assertThat(Assert.java:923)
at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1270)
at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1)
...
(as opposed to, basically, just the "Caused by" part)
This reverts commit 47592f66222e0b8a82d4c94d14cfba38044658f4.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d48c0f3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d48c0f3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d48c0f3f
Branch: refs/heads/master
Commit: d48c0f3f12920f4d2bdd5e6a719cd9bc8f20cc5f
Parents: 7b154e6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 15:19:00 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:20 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 209 ++++++++++---------
.../apache/beam/sdk/testing/PAssertTest.java | 44 ++++
2 files changed, 160 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d48c0f3f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 2596335..f6a409a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -286,7 +286,7 @@ public class PAssert {
* with the specified reason.
*/
public static <T> IterableAssert<T> that(String reason, PCollection<T> actual) {
- return new PCollectionContentsAssert<>(reason, actual);
+ return new PCollectionContentsAssert<>(actual, PAssertionSite.capture(reason));
}
/**
@@ -316,7 +316,8 @@ public class PAssert {
@SuppressWarnings("unchecked") // Safe covariant cast
PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
- return new PCollectionSingletonIterableAssert<>(reason, actualIterables);
+ return new PCollectionSingletonIterableAssert<>(
+ actualIterables, PAssertionSite.capture(reason));
}
/**
@@ -333,7 +334,9 @@ public class PAssert {
* a singleton.
*/
public static <T> SingletonAssert<T> thatSingleton(String reason, PCollection<T> actual) {
- return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder(), reason);
+ return new PCollectionViewAssert<>(
+ actual, View.<T>asSingleton(), actual.getCoder(), PAssertionSite.capture(reason)
+ );
}
/**
@@ -362,7 +365,7 @@ public class PAssert {
actual,
View.<K, V>asMultimap(),
MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())),
- reason);
+ PAssertionSite.capture(reason));
}
/**
@@ -388,12 +391,36 @@ public class PAssert {
@SuppressWarnings("unchecked")
KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
return new PCollectionViewAssert<>(
- actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
- reason);
+ actual,
+ View.<K, V>asMap(),
+ MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
+ PAssertionSite.capture(reason));
}
////////////////////////////////////////////////////////////
+ private static class PAssertionSite implements Serializable {
+ private final String message;
+ private final StackTraceElement[] creationStackTrace;
+
+ static PAssertionSite capture(String message) {
+ return new PAssertionSite(message, new Throwable().getStackTrace());
+ }
+
+ PAssertionSite(String message, StackTraceElement[] creationStackTrace) {
+ this.message = message;
+ this.creationStackTrace = creationStackTrace;
+ }
+
+ public AssertionError wrap(Throwable t) {
+ AssertionError res =
+ new AssertionError(
+ message.isEmpty() ? t.getMessage() : (message + ": " + t.getMessage()), t);
+ res.setStackTrace(creationStackTrace);
+ return res;
+ }
+ }
+
/**
* An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
* the runner to support side inputs.
@@ -402,21 +429,21 @@ public class PAssert {
private final PCollection<T> actual;
private final AssertionWindows rewindowingStrategy;
private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor;
- private final String reason;
+ private final PAssertionSite site;
- public PCollectionContentsAssert(String reason, PCollection<T> actual) {
- this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes(), reason);
+ public PCollectionContentsAssert(PCollection<T> actual, PAssertionSite site) {
+ this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes(), site);
}
public PCollectionContentsAssert(
PCollection<T> actual,
AssertionWindows rewindowingStrategy,
SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
- String reason) {
+ PAssertionSite site) {
this.actual = actual;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
- this.reason = reason;
+ this.site = site;
}
@Override
@@ -451,7 +478,7 @@ public class PAssert {
Coder<BoundedWindow> windowCoder =
(Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
return new PCollectionContentsAssert<>(
- actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor, reason);
+ actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor, site);
}
/**
@@ -472,7 +499,7 @@ public class PAssert {
*/
@Override
public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(reason), expectedElements);
+ return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
}
@Override
@@ -486,7 +513,7 @@ public class PAssert {
SerializableFunction<Iterable<T>, Void> checkerFn) {
actual.apply(
nextAssertionName(),
- new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor));
+ new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
return this;
}
@@ -525,25 +552,23 @@ public class PAssert {
// more flexible bounds.
@SuppressWarnings({"rawtypes", "unchecked"})
SerializableFunction<Iterable<T>, Void> checkerFn =
- (SerializableFunction) new MatcherCheckerFn<>(reason, matcher);
+ (SerializableFunction) new MatcherCheckerFn<>(matcher);
actual.apply(
"PAssert$" + (assertCount++),
- new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor));
+ new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site));
return this;
}
private static class MatcherCheckerFn<T> implements SerializableFunction<T, Void> {
- private final String reason;
- private final SerializableMatcher<T> matcher;
+ private SerializableMatcher<T> matcher;
- public MatcherCheckerFn(String reason, SerializableMatcher<T> matcher) {
- this.reason = reason;
+ public MatcherCheckerFn(SerializableMatcher<T> matcher) {
this.matcher = matcher;
}
@Override
public Void apply(T actual) {
- assertThat(reason, actual, matcher);
+ assertThat(actual, matcher);
return null;
}
}
@@ -582,11 +607,12 @@ public class PAssert {
private final AssertionWindows rewindowingStrategy;
private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
paneExtractor;
- private final String reason;
+ private final PAssertionSite site;
- public PCollectionSingletonIterableAssert(String reason, PCollection<Iterable<T>> actual) {
- this(actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane(),
- reason);
+ public PCollectionSingletonIterableAssert(
+ PCollection<Iterable<T>> actual, PAssertionSite site) {
+ this(
+ actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane(), site);
}
public PCollectionSingletonIterableAssert(
@@ -594,7 +620,7 @@ public class PAssert {
AssertionWindows rewindowingStrategy,
SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
paneExtractor,
- String reason) {
+ PAssertionSite site) {
this.actual = actual;
@SuppressWarnings("unchecked")
@@ -603,7 +629,7 @@ public class PAssert {
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
- this.reason = reason;
+ this.site = site;
}
@Override
@@ -639,7 +665,7 @@ public class PAssert {
Coder<BoundedWindow> windowCoder =
(Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
return new PCollectionSingletonIterableAssert<>(
- actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor, reason);
+ actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor, site);
}
@Override
@@ -655,7 +681,7 @@ public class PAssert {
@Override
public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(reason), expectedElements);
+ return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
}
@Override
@@ -663,7 +689,7 @@ public class PAssert {
SerializableFunction<Iterable<T>, Void> checkerFn) {
actual.apply(
"PAssert$" + (assertCount++),
- new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor));
+ new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor, site));
return this;
}
@@ -686,15 +712,16 @@ public class PAssert {
private final SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>>
paneExtractor;
private final Coder<ViewT> coder;
- private final String reason;
+ private final PAssertionSite site;
protected PCollectionViewAssert(
PCollection<ElemT> actual,
PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
Coder<ViewT> coder,
- String reason) {
- this(actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder,
- reason);
+ PAssertionSite site) {
+ this(
+ actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder, site
+ );
}
private PCollectionViewAssert(
@@ -703,13 +730,13 @@ public class PAssert {
AssertionWindows rewindowActuals,
SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor,
Coder<ViewT> coder,
- String reason) {
+ PAssertionSite site) {
this.actual = actual;
this.view = view;
this.rewindowActuals = rewindowActuals;
this.paneExtractor = paneExtractor;
this.coder = coder;
- this.reason = reason;
+ this.site = site;
}
@Override
@@ -737,17 +764,17 @@ public class PAssert {
(Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window),
paneExtractor,
coder,
- reason);
+ site);
}
@Override
public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
- return satisfies(new AssertIsEqualToRelation<ViewT>(reason), expectedValue);
+ return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
}
@Override
public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
- return satisfies(new AssertNotEqualToRelation<ViewT>(reason), expectedValue);
+ return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
}
@Override
@@ -760,7 +787,8 @@ public class PAssert {
new OneSideInputAssert<ViewT>(
CreateActual.from(actual, rewindowActuals, paneExtractor, view),
rewindowActuals.<Integer>windowDummy(),
- checkerFn));
+ checkerFn,
+ site));
return this;
}
@@ -983,14 +1011,17 @@ public class PAssert {
private final SerializableFunction<Iterable<T>, Void> checkerFn;
private final AssertionWindows rewindowingStrategy;
private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor;
+ private final PAssertionSite site;
private GroupThenAssert(
SerializableFunction<Iterable<T>, Void> checkerFn,
AssertionWindows rewindowingStrategy,
- SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) {
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor,
+ PAssertionSite site) {
this.checkerFn = checkerFn;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
+ this.site = site;
}
@Override
@@ -999,7 +1030,7 @@ public class PAssert {
.apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
.setCoder(IterableCoder.of(input.getCoder()))
- .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
+ .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn, site)));
return PDone.in(input.getPipeline());
}
@@ -1015,15 +1046,18 @@ public class PAssert {
private final AssertionWindows rewindowingStrategy;
private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
paneExtractor;
+ private final PAssertionSite site;
private GroupThenAssertForSingleton(
SerializableFunction<Iterable<T>, Void> checkerFn,
AssertionWindows rewindowingStrategy,
SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
- paneExtractor) {
+ paneExtractor,
+ PAssertionSite site) {
this.checkerFn = checkerFn;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
+ this.site = site;
}
@Override
@@ -1032,7 +1066,7 @@ public class PAssert {
.apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
.apply("GetPane", MapElements.via(paneExtractor))
.setCoder(IterableCoder.of(input.getCoder()))
- .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
+ .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn, site)));
return PDone.in(input.getPipeline());
}
@@ -1053,14 +1087,17 @@ public class PAssert {
private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken;
private final SerializableFunction<ActualT, Void> checkerFn;
+ private final PAssertionSite site;
private OneSideInputAssert(
PTransform<PBegin, PCollectionView<ActualT>> createActual,
PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
- SerializableFunction<ActualT, Void> checkerFn) {
+ SerializableFunction<ActualT, Void> checkerFn,
+ PAssertionSite site) {
this.createActual = createActual;
this.windowToken = windowToken;
this.checkerFn = checkerFn;
+ this.site = site;
}
@Override
@@ -1072,7 +1109,7 @@ public class PAssert {
.apply("WindowToken", windowToken)
.apply(
"RunChecks",
- ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+ ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual, site)));
return PDone.in(input.getPipeline());
}
@@ -1092,17 +1129,21 @@ public class PAssert {
private final Aggregator<Integer, Integer> failure =
createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private final PCollectionView<ActualT> actual;
+ private final PAssertionSite site;
private SideInputCheckerDoFn(
- SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
+ SerializableFunction<ActualT, Void> checkerFn,
+ PCollectionView<ActualT> actual,
+ PAssertionSite site) {
this.checkerFn = checkerFn;
this.actual = actual;
+ this.site = site;
}
@ProcessElement
public void processElement(ProcessContext c) {
ActualT actualContents = c.sideInput(actual);
- doChecks(actualContents, checkerFn, success, failure);
+ doChecks(site, actualContents, checkerFn, success, failure);
}
}
@@ -1119,14 +1160,17 @@ public class PAssert {
createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
+ private final PAssertionSite site;
- private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+ private GroupedValuesCheckerDoFn(
+ SerializableFunction<ActualT, Void> checkerFn, PAssertionSite site) {
this.checkerFn = checkerFn;
+ this.site = site;
}
@ProcessElement
public void processElement(ProcessContext c) {
- doChecks(c.element(), checkerFn, success, failure);
+ doChecks(site, c.element(), checkerFn, success, failure);
}
}
@@ -1144,19 +1188,23 @@ public class PAssert {
createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
+ private final PAssertionSite site;
- private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+ private SingletonCheckerDoFn(
+ SerializableFunction<ActualT, Void> checkerFn, PAssertionSite site) {
this.checkerFn = checkerFn;
+ this.site = site;
}
@ProcessElement
public void processElement(ProcessContext c) {
ActualT actualContents = Iterables.getOnlyElement(c.element());
- doChecks(actualContents, checkerFn, success, failure);
+ doChecks(site, actualContents, checkerFn, success, failure);
}
}
private static <ActualT> void doChecks(
+ PAssertionSite site,
ActualT actualContents,
SerializableFunction<ActualT, Void> checkerFn,
Aggregator<Integer, Integer> successAggregator,
@@ -1165,9 +1213,8 @@ public class PAssert {
checkerFn.apply(actualContents);
successAggregator.addValue(1);
} catch (Throwable t) {
- LOG.error("PAssert failed expectations.", t);
failureAggregator.addValue(1);
- throw t;
+ throw site.wrap(t);
}
}
@@ -1178,17 +1225,15 @@ public class PAssert {
* value.
*/
private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
- private final String reason;
- private final T expected;
+ private T expected;
- public AssertIsEqualTo(String reason, T expected) {
- this.reason = reason;
+ public AssertIsEqualTo(T expected) {
this.expected = expected;
}
@Override
public Void apply(T actual) {
- assertThat(reason, actual, equalTo(expected));
+ assertThat(actual, equalTo(expected));
return null;
}
}
@@ -1198,17 +1243,15 @@ public class PAssert {
* value.
*/
private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
- private String reason;
private T expected;
- public AssertNotEqualTo(String reason, T expected) {
- this.reason = reason;
+ public AssertNotEqualTo(T expected) {
this.expected = expected;
}
@Override
public Void apply(T actual) {
- assertThat(reason, actual, not(equalTo(expected)));
+ assertThat(actual, not(equalTo(expected)));
return null;
}
}
@@ -1219,27 +1262,25 @@ public class PAssert {
*/
private static class AssertContainsInAnyOrder<T>
implements SerializableFunction<Iterable<T>, Void> {
- private final String reason;
- private final T[] expected;
+ private T[] expected;
@SafeVarargs
- public AssertContainsInAnyOrder(String reason, T... expected) {
- this.reason = reason;
+ public AssertContainsInAnyOrder(T... expected) {
this.expected = expected;
}
@SuppressWarnings("unchecked")
- public AssertContainsInAnyOrder(String reason, Collection<T> expected) {
- this(reason, (T[]) expected.toArray());
+ public AssertContainsInAnyOrder(Collection<T> expected) {
+ this((T[]) expected.toArray());
}
- public AssertContainsInAnyOrder(String reason, Iterable<T> expected) {
- this(reason, Lists.<T>newArrayList(expected));
+ public AssertContainsInAnyOrder(Iterable<T> expected) {
+ this(Lists.<T>newArrayList(expected));
}
@Override
public Void apply(Iterable<T> actual) {
- assertThat(reason, actual, containsInAnyOrder(expected));
+ assertThat(actual, containsInAnyOrder(expected));
return null;
}
}
@@ -1259,15 +1300,9 @@ public class PAssert {
* An {@link AssertRelation} implementing the binary predicate that two objects are equal.
*/
private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
- private final String reason;
-
- public AssertIsEqualToRelation(String reason) {
- this.reason = reason;
- }
-
@Override
public SerializableFunction<T, Void> assertFor(T expected) {
- return new AssertIsEqualTo<T>(reason, expected);
+ return new AssertIsEqualTo<T>(expected);
}
}
@@ -1275,15 +1310,9 @@ public class PAssert {
* An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
*/
private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
- private final String reason;
-
- public AssertNotEqualToRelation(String reason) {
- this.reason = reason;
- }
-
@Override
public SerializableFunction<T, Void> assertFor(T expected) {
- return new AssertNotEqualTo<T>(reason, expected);
+ return new AssertNotEqualTo<T>(expected);
}
}
@@ -1293,15 +1322,9 @@ public class PAssert {
*/
private static class AssertContainsInAnyOrderRelation<T>
implements AssertRelation<Iterable<T>, Iterable<T>> {
- private final String reason;
-
- public AssertContainsInAnyOrderRelation(String reason) {
- this.reason = reason;
- }
-
@Override
public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> expectedElements) {
- return new AssertContainsInAnyOrder<T>(reason, expectedElements);
+ return new AssertContainsInAnyOrder<T>(expectedElements);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d48c0f3f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 1603db5..dab457a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
@@ -450,6 +451,49 @@ public class PAssertTest implements Serializable {
assertThat(message, containsString("Expected: iterable over [] in any order"));
}
+ @Test
+ @Category(RunnableOnService.class)
+ public void testAssertionSiteIsCapturedWithMessage() throws Exception {
+ PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+ assertThatCollectionIsEmptyWithMessage(vals);
+
+ Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+ assertThat(
+ thrown.getMessage(),
+ containsString("Should be empty"));
+ assertThat(
+ thrown.getMessage(),
+ containsString("Expected: iterable over [] in any order"));
+ String stacktrace = Throwables.getStackTraceAsString(thrown);
+ assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage"));
+ assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage"));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
+ PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+ assertThatCollectionIsEmptyWithoutMessage(vals);
+
+ Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+ assertThat(
+ thrown.getMessage(),
+ containsString("Expected: iterable over [] in any order"));
+ String stacktrace = Throwables.getStackTraceAsString(thrown);
+ assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage"));
+ assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage"));
+ }
+
+ private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> vals) {
+ PAssert.that("Should be empty", vals).empty();
+ }
+
+ private static void assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) {
+ PAssert.that(vals).empty();
+ }
+
private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
// We cannot use thrown.expect(AssertionError.class) because the AssertionError
// is first caught by JUnit and causes a test failure.
[2/4] beam git commit: Bump dataflow.container_version to 0314
Posted by tg...@apache.org.
Bump dataflow.container_version to 0314
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b154e6c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b154e6c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b154e6c
Branch: refs/heads/master
Commit: 7b154e6cac58f6e95e2d0e16bcb5ffe09eb63611
Parents: 76b7991
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Mar 14 14:31:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 17 09:54:20 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7b154e6c/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index ff79681..255c326 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170307</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170314</dataflow.container_version>
<dataflow.environment_major_version>6</dataflow.environment_major_version>
</properties>