You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/07/26 23:33:01 UTC

[1/2] beam git commit: Closes #3628

Repository: beam
Updated Branches:
  refs/heads/master 673937b92 -> e206b1a41


Closes #3628


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e206b1a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e206b1a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e206b1a4

Branch: refs/heads/master
Commit: e206b1a41c1f22330cb78fd1082425ea1e74dbb4
Parents: 673937b 51867ad
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 16:31:13 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 26 16:31:13 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/testing/CombineFnTester.java       | 147 ++++++++++
 .../java/org/apache/beam/sdk/TestUtils.java     |  88 ------
 .../beam/sdk/testing/CombineFnTesterTest.java   | 276 +++++++++++++++++++
 .../transforms/ApproximateQuantilesTest.java    |  24 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |   8 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   4 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |   8 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |   8 +-
 9 files changed, 455 insertions(+), 120 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Migrate checkCombineFn in TestUtils to CombineFnTester

Posted by ro...@apache.org.
Migrate checkCombineFn in TestUtils to CombineFnTester

This makes CombineFnTester significantly more discoverable, and usable
without having dependencies on the test JAR.

Update existing tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51867adf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51867adf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51867adf

Branch: refs/heads/master
Commit: 51867adfea0d78a7dcb0fb09b5c257cf8e0e2ff4
Parents: 673937b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Jul 20 13:36:04 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 26 16:31:13 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/testing/CombineFnTester.java       | 147 ++++++++++
 .../java/org/apache/beam/sdk/TestUtils.java     |  88 ------
 .../beam/sdk/testing/CombineFnTesterTest.java   | 276 +++++++++++++++++++
 .../transforms/ApproximateQuantilesTest.java    |  24 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |   8 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   4 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |   8 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |   8 +-
 9 files changed, 455 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
new file mode 100644
index 0000000..efd2af3
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.hamcrest.Matcher;
+
+/**
+ * Utilities for testing {@link CombineFn CombineFns}. Ensures that the {@link CombineFn} gives
+ * correct results across various permutations and shardings of the input.
+ */
+public class CombineFnTester {
+  /**
+   * Tests that the the {@link CombineFn}, when applied to the provided input, produces the provided
+   * output. Tests a variety of permutations of the input.
+   */
+  public static <InputT, AccumT, OutputT> void testCombineFn(
+      CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, final OutputT expected) {
+    testCombineFn(fn, input, is(expected));
+    Collections.shuffle(input);
+    testCombineFn(fn, input, is(expected));
+  }
+
+  public static <InputT, AccumT, OutputT> void testCombineFn(
+      CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) {
+    int size = input.size();
+    checkCombineFnShardsMultipleOrders(fn, Collections.singletonList(input), matcher);
+    checkCombineFnShardsMultipleOrders(fn, shardEvenly(input, 2), matcher);
+    if (size > 4) {
+      checkCombineFnShardsMultipleOrders(fn, shardEvenly(input, size / 2), matcher);
+      checkCombineFnShardsMultipleOrders(
+          fn, shardEvenly(input, (int) (size / Math.sqrt(size))), matcher);
+    }
+    checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, 1.4), matcher);
+    checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, 2), matcher);
+    checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, Math.E), matcher);
+  }
+
+  private static <InputT, AccumT, OutputT> void checkCombineFnShardsMultipleOrders(
+      CombineFn<InputT, AccumT, OutputT> fn,
+      List<? extends Iterable<InputT>> shards,
+      Matcher<? super OutputT> matcher) {
+    checkCombineFnShardsSingleMerge(fn, shards, matcher);
+    checkCombineFnShardsWithEmptyAccumulators(fn, shards, matcher);
+    checkCombineFnShardsIncrementalMerging(fn, shards, matcher);
+    Collections.shuffle(shards);
+    checkCombineFnShardsSingleMerge(fn, shards, matcher);
+    checkCombineFnShardsWithEmptyAccumulators(fn, shards, matcher);
+    checkCombineFnShardsIncrementalMerging(fn, shards, matcher);
+  }
+
+  private static <InputT, AccumT, OutputT> void checkCombineFnShardsSingleMerge(
+      CombineFn<InputT, AccumT, OutputT> fn,
+      Iterable<? extends Iterable<InputT>> shards,
+      Matcher<? super OutputT> matcher) {
+    List<AccumT> accumulators = combineInputs(fn, shards);
+    AccumT merged = fn.mergeAccumulators(accumulators);
+    assertThat(fn.extractOutput(merged), matcher);
+  }
+
+  private static <InputT, AccumT, OutputT> void checkCombineFnShardsWithEmptyAccumulators(
+      CombineFn<InputT, AccumT, OutputT> fn,
+      Iterable<? extends Iterable<InputT>> shards,
+      Matcher<? super OutputT> matcher) {
+    List<AccumT> accumulators = combineInputs(fn, shards);
+    accumulators.add(0, fn.createAccumulator());
+    accumulators.add(fn.createAccumulator());
+    AccumT merged = fn.mergeAccumulators(accumulators);
+    assertThat(fn.extractOutput(merged), matcher);
+  }
+
+  private static <InputT, AccumT, OutputT> void checkCombineFnShardsIncrementalMerging(
+      CombineFn<InputT, AccumT, OutputT> fn,
+      List<? extends Iterable<InputT>> shards,
+      Matcher<? super OutputT> matcher) {
+    AccumT accumulator = null;
+    for (AccumT inputAccum : combineInputs(fn, shards)) {
+      if (accumulator == null) {
+        accumulator = inputAccum;
+      } else {
+        accumulator = fn.mergeAccumulators(Arrays.asList(accumulator, inputAccum));
+      }
+    }
+    assertThat(fn.extractOutput(accumulator), matcher);
+  }
+
+  private static <InputT, AccumT, OutputT> List<AccumT> combineInputs(
+      CombineFn<InputT, AccumT, OutputT> fn, Iterable<? extends Iterable<InputT>> shards) {
+    List<AccumT> accumulators = new ArrayList<>();
+    int maybeCompact = 0;
+    for (Iterable<InputT> shard : shards) {
+      AccumT accumulator = fn.createAccumulator();
+      for (InputT elem : shard) {
+        accumulator = fn.addInput(accumulator, elem);
+      }
+      if (maybeCompact++ % 2 == 0) {
+        accumulator = fn.compact(accumulator);
+      }
+      accumulators.add(accumulator);
+    }
+    return accumulators;
+  }
+
+  private static <T> List<List<T>> shardEvenly(List<T> input, int numShards) {
+    List<List<T>> shards = new ArrayList<>(numShards);
+    for (int i = 0; i < numShards; i++) {
+      shards.add(input.subList(i * input.size() / numShards,
+          (i + 1) * input.size() / numShards));
+    }
+    return shards;
+  }
+
+  private static <T> List<List<T>> shardExponentially(
+      List<T> input, double base) {
+    assert base > 1.0;
+    List<List<T>> shards = new ArrayList<>();
+    int end = input.size();
+    while (end > 0) {
+      int start = (int) (end / base);
+      shards.add(input.subList(start, end));
+      end = start;
+    }
+    return shards;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java
index 1224f10..5ccc1ac 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java
@@ -17,15 +17,9 @@
  */
 package org.apache.beam.sdk;
 
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.values.KV;
-import org.hamcrest.CoreMatchers;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
@@ -127,86 +121,4 @@ public class TestUtils {
           .appendText(")");
     }
   }
-
-  ////////////////////////////////////////////////////////////////////////////
-  // Utilities for testing CombineFns, ensuring they give correct results
-  // across various permutations and shardings of the input.
-
-  public static <InputT, AccumT, OutputT> void checkCombineFn(
-      CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, final OutputT expected) {
-    checkCombineFn(fn, input, CoreMatchers.is(expected));
-  }
-
-  public static <InputT, AccumT, OutputT> void checkCombineFn(
-      CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) {
-    checkCombineFnInternal(fn, input, matcher);
-    Collections.shuffle(input);
-    checkCombineFnInternal(fn, input, matcher);
-  }
-
-  private static <InputT, AccumT, OutputT> void checkCombineFnInternal(
-      CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) {
-    int size = input.size();
-    checkCombineFnShards(fn, Collections.singletonList(input), matcher);
-    checkCombineFnShards(fn, shardEvenly(input, 2), matcher);
-    if (size > 4) {
-      checkCombineFnShards(fn, shardEvenly(input, size / 2), matcher);
-      checkCombineFnShards(
-          fn, shardEvenly(input, (int) (size / Math.sqrt(size))), matcher);
-    }
-    checkCombineFnShards(fn, shardExponentially(input, 1.4), matcher);
-    checkCombineFnShards(fn, shardExponentially(input, 2), matcher);
-    checkCombineFnShards(fn, shardExponentially(input, Math.E), matcher);
-  }
-
-  public static <InputT, AccumT, OutputT> void checkCombineFnShards(
-      CombineFn<InputT, AccumT, OutputT> fn,
-      List<? extends Iterable<InputT>> shards,
-      Matcher<? super OutputT> matcher) {
-    checkCombineFnShardsInternal(fn, shards, matcher);
-    Collections.shuffle(shards);
-    checkCombineFnShardsInternal(fn, shards, matcher);
-  }
-
-  private static <InputT, AccumT, OutputT> void checkCombineFnShardsInternal(
-      CombineFn<InputT, AccumT, OutputT> fn,
-      Iterable<? extends Iterable<InputT>> shards,
-      Matcher<? super OutputT> matcher) {
-    List<AccumT> accumulators = new ArrayList<>();
-    int maybeCompact = 0;
-    for (Iterable<InputT> shard : shards) {
-      AccumT accumulator = fn.createAccumulator();
-      for (InputT elem : shard) {
-        accumulator = fn.addInput(accumulator, elem);
-      }
-      if (maybeCompact++ % 2 == 0) {
-        accumulator = fn.compact(accumulator);
-      }
-      accumulators.add(accumulator);
-    }
-    AccumT merged = fn.mergeAccumulators(accumulators);
-    assertThat(fn.extractOutput(merged), matcher);
-  }
-
-  private static <T> List<List<T>> shardEvenly(List<T> input, int numShards) {
-    List<List<T>> shards = new ArrayList<>(numShards);
-    for (int i = 0; i < numShards; i++) {
-      shards.add(input.subList(i * input.size() / numShards,
-                               (i + 1) * input.size() / numShards));
-    }
-    return shards;
-  }
-
-  private static <T> List<List<T>> shardExponentially(
-      List<T> input, double base) {
-    assert base > 1.0;
-    List<List<T>> shards = new ArrayList<>();
-    int end = input.size();
-    while (end > 0) {
-      int start = (int) (end / base);
-      shards.add(input.subList(start, end));
-      end = start;
-    }
-    return shards;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java
new file mode 100644
index 0000000..15198b2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CombineFnTester}.
+ */
+@RunWith(JUnit4.class)
+public class CombineFnTesterTest {
+  @Test
+  public void checksMergeWithEmptyAccumulators() {
+    final AtomicBoolean sawEmpty = new AtomicBoolean(false);
+    CombineFn<Integer, Integer, Integer> combineFn =
+        new CombineFn<Integer, Integer, Integer>() {
+          @Override
+          public Integer createAccumulator() {
+            return 0;
+          }
+
+          @Override
+          public Integer addInput(Integer accumulator, Integer input) {
+            return accumulator + input;
+          }
+
+          @Override
+          public Integer mergeAccumulators(Iterable<Integer> accumulators) {
+            int result = 0;
+            for (int accum : accumulators) {
+              if (accum == 0) {
+                sawEmpty.set(true);
+              }
+              result += accum;
+            }
+            return result;
+          }
+
+          @Override
+          public Integer extractOutput(Integer accumulator) {
+            return accumulator;
+          }
+        };
+
+    CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 2, 3, 4, 5), 15);
+    assertThat(sawEmpty.get(), is(true));
+  }
+
+  @Test
+  public void checksWithSingleShard() {
+    final AtomicBoolean sawSingleShard = new AtomicBoolean();
+    CombineFn<Integer, Integer, Integer> combineFn =
+        new CombineFn<Integer, Integer, Integer>() {
+          int accumCount = 0;
+
+          @Override
+          public Integer createAccumulator() {
+            accumCount++;
+            return 0;
+          }
+
+          @Override
+          public Integer addInput(Integer accumulator, Integer input) {
+            return accumulator + input;
+          }
+
+          @Override
+          public Integer mergeAccumulators(Iterable<Integer> accumulators) {
+            int result = 0;
+            for (int accum : accumulators) {
+              result += accum;
+            }
+            return result;
+          }
+
+          @Override
+          public Integer extractOutput(Integer accumulator) {
+            if (accumCount == 1) {
+              sawSingleShard.set(true);
+            }
+            accumCount = 0;
+            return accumulator;
+          }
+        };
+
+    CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 2, 3, 4, 5), 15);
+    assertThat(sawSingleShard.get(), is(true));
+  }
+
+  @Test
+  public void checksWithShards() {
+    final AtomicBoolean sawManyShards = new AtomicBoolean();
+    CombineFn<Integer, Integer, Integer> combineFn =
+        new CombineFn<Integer, Integer, Integer>() {
+
+          @Override
+          public Integer createAccumulator() {
+            return 0;
+          }
+
+          @Override
+          public Integer addInput(Integer accumulator, Integer input) {
+            return accumulator + input;
+          }
+
+          @Override
+          public Integer mergeAccumulators(Iterable<Integer> accumulators) {
+            if (Iterables.size(accumulators) > 2) {
+              sawManyShards.set(true);
+            }
+            int result = 0;
+            for (int accum : accumulators) {
+              result += accum;
+            }
+            return result;
+          }
+
+          @Override
+          public Integer extractOutput(Integer accumulator) {
+            return accumulator;
+          }
+        };
+
+    CombineFnTester.testCombineFn(
+        combineFn, Arrays.asList(1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3), 30);
+    assertThat(sawManyShards.get(), is(true));
+  }
+
+  @Test
+  public void checksWithMultipleMerges() {
+    final AtomicBoolean sawMultipleMerges = new AtomicBoolean();
+    CombineFn<Integer, Integer, Integer> combineFn =
+        new CombineFn<Integer, Integer, Integer>() {
+          int mergeCalls = 0;
+
+          @Override
+          public Integer createAccumulator() {
+            return 0;
+          }
+
+          @Override
+          public Integer addInput(Integer accumulator, Integer input) {
+            return accumulator + input;
+          }
+
+          @Override
+          public Integer mergeAccumulators(Iterable<Integer> accumulators) {
+            mergeCalls++;
+            int result = 0;
+            for (int accum : accumulators) {
+              result += accum;
+            }
+            return result;
+          }
+
+          @Override
+          public Integer extractOutput(Integer accumulator) {
+            if (mergeCalls > 1) {
+              sawMultipleMerges.set(true);
+            }
+            mergeCalls = 0;
+            return accumulator;
+          }
+        };
+
+    CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5), 30);
+    assertThat(sawMultipleMerges.get(), is(true));
+  }
+
+  @Test
+  public void checksAlternateOrder() {
+    final AtomicBoolean sawOutOfOrder = new AtomicBoolean();
+    CombineFn<Integer, List<Integer>, Integer> combineFn =
+        new CombineFn<Integer, List<Integer>, Integer>() {
+          @Override
+          public List<Integer> createAccumulator() {
+            return new ArrayList<>();
+          }
+
+          @Override
+          public List<Integer> addInput(List<Integer> accumulator, Integer input) {
+            // If the input is being added to an empty accumulator, it's not known to be
+            // out of order, and it cannot be compared to the previous element. If the elements
+            // are out of order (relative to the input) a greater element will be added before
+            // a smaller one.
+            if (!accumulator.isEmpty() && accumulator.get(accumulator.size() - 1) > input) {
+              sawOutOfOrder.set(true);
+            }
+            accumulator.add(input);
+            return accumulator;
+          }
+
+          @Override
+          public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) {
+            List<Integer> result = new ArrayList<>();
+            for (List<Integer> accum : accumulators) {
+              result.addAll(accum);
+            }
+            return result;
+          }
+
+          @Override
+          public Integer extractOutput(List<Integer> accumulator) {
+            int value = 0;
+            for (int i : accumulator) {
+              value += i;
+            }
+            return value;
+          }
+        };
+
+    CombineFnTester.testCombineFn(
+        combineFn, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14), 105);
+    assertThat(sawOutOfOrder.get(), is(true));
+  }
+
+  @Test
+  public void usesMatcher() {
+    final AtomicBoolean matcherUsed = new AtomicBoolean();
+    Matcher<Integer> matcher =
+        new TypeSafeMatcher<Integer>() {
+          @Override
+          public void describeTo(Description description) {}
+
+          @Override
+          protected boolean matchesSafely(Integer item) {
+            matcherUsed.set(true);
+            return item == 30;
+          }
+        };
+    CombineFnTester.testCombineFn(
+        Sum.ofIntegers(), Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5), matcher);
+    assertThat(matcherUsed.get(), is(true));
+    try {
+      CombineFnTester.testCombineFn(
+          Sum.ofIntegers(), Arrays.asList(1, 2, 3, 4, 5), Matchers.not(Matchers.equalTo(15)));
+    } catch (AssertionError ignored) {
+      // Success! Return to avoid the call to fail();
+      return;
+    }
+    fail("The matcher should have failed, throwing an error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 9e0b3cc..e180833 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -129,7 +129,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testSingleton() {
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         Arrays.asList(389),
         Arrays.asList(389, 389, 389, 389, 389));
@@ -137,7 +137,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testSimpleQuantiles() {
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         intRange(101),
         Arrays.asList(0, 25, 50, 75, 100));
@@ -145,7 +145,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testUnevenQuantiles() {
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(37),
         intRange(5000),
         quantileMatcher(5000, 37, 20 /* tolerance */));
@@ -153,7 +153,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testLargerQuantiles() {
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(50),
         intRange(10001),
         quantileMatcher(10001, 50, 20 /* tolerance */));
@@ -161,7 +161,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testTightEpsilon() {
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01),
         intRange(10001),
         quantileMatcher(10001, 10, 5 /* tolerance */));
@@ -174,7 +174,7 @@ public class ApproximateQuantilesTest {
     for (int i = 0; i < 10; i++) {
       all.addAll(intRange(size));
     }
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         all,
         Arrays.asList(0, 25, 50, 75, 100));
@@ -190,7 +190,7 @@ public class ApproximateQuantilesTest {
     for (int i = 300; i < 1000; i++) {
       all.add(3);
     }
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         all,
         Arrays.asList(1, 2, 3, 3, 3));
@@ -202,7 +202,7 @@ public class ApproximateQuantilesTest {
     for (int i = 1; i < 1000; i++) {
       all.add((int) Math.log(i));
     }
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         all,
         Arrays.asList(0, 5, 6, 6, 6));
@@ -214,7 +214,7 @@ public class ApproximateQuantilesTest {
     for (int i = 1; i < 1000; i++) {
       all.add(1000 / i);
     }
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<Integer>create(5),
         all,
         Arrays.asList(1, 1, 2, 4, 1000));
@@ -224,11 +224,11 @@ public class ApproximateQuantilesTest {
   public void testAlternateComparator() {
     List<String> inputs = Arrays.asList(
         "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz");
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.<String>create(3),
         inputs,
         Arrays.asList("aa", "b", "zz"));
-    checkCombineFn(
+    testCombineFn(
         ApproximateQuantilesCombineFn.create(3, new OrderByLength()),
         inputs,
         Arrays.asList("b", "aaa", "ccccc"));

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index b24d82d..52fedc6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
@@ -695,11 +695,11 @@ public class CombineTest implements Serializable {
 
   @Test
   public void testBinaryCombineFnWithNulls() {
-    checkCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45);
-    checkCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30);
-    checkCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18);
-    checkCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12);
-    checkCombineFn(new NullCombiner(), Arrays.<Integer>asList(null, null, null), 8);
+    testCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45);
+    testCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30);
+    testCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18);
+    testCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12);
+    testCombineFn(new NullCombiner(), Arrays.<Integer>asList(null, null, null), 8);
   }
 
   private static final class TestProdInt extends Combine.BinaryCombineIntegerFn {

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 52043e1..a298a5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -45,7 +45,7 @@ public class MaxTest {
 
   @Test
   public void testMaxIntegerFn() {
-    checkCombineFn(
+    testCombineFn(
         Max.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         4);
@@ -53,7 +53,7 @@ public class MaxTest {
 
   @Test
   public void testMaxLongFn() {
-    checkCombineFn(
+    testCombineFn(
         Max.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         4L);
@@ -61,7 +61,7 @@ public class MaxTest {
 
   @Test
   public void testMaxDoubleFn() {
-    checkCombineFn(
+    testCombineFn(
         Max.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         4.0);

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 79ebc25..e138135 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.Lists;
@@ -64,7 +64,7 @@ public class MeanTest {
 
   @Test
   public void testMeanFn() throws Exception {
-    checkCombineFn(
+    testCombineFn(
         Mean.<Integer>of(),
         Lists.newArrayList(1, 2, 3, 4),
         2.5);

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index 1ece09b..a515b63 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -45,7 +45,7 @@ public class MinTest {
   }
   @Test
   public void testMinIntegerFn() {
-    checkCombineFn(
+    testCombineFn(
         Min.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         1);
@@ -53,7 +53,7 @@ public class MinTest {
 
   @Test
   public void testMinLongFn() {
-    checkCombineFn(
+    testCombineFn(
         Min.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         1L);
@@ -61,7 +61,7 @@ public class MinTest {
 
   @Test
   public void testMinDoubleFn() {
-    checkCombineFn(
+    testCombineFn(
         Min.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         1.0);

http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index 9d2c6f6..e5bf904 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -51,7 +51,7 @@ public class SumTest {
 
   @Test
   public void testSumIntegerFn() {
-    checkCombineFn(
+    testCombineFn(
         Sum.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         10);
@@ -59,7 +59,7 @@ public class SumTest {
 
   @Test
   public void testSumLongFn() {
-    checkCombineFn(
+    testCombineFn(
         Sum.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         10L);
@@ -67,7 +67,7 @@ public class SumTest {
 
   @Test
   public void testSumDoubleFn() {
-    checkCombineFn(
+    testCombineFn(
         Sum.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         10.0);