You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/17 02:44:05 UTC

[1/2] beam git commit: This closes #3861

Repository: beam
Updated Branches:
  refs/heads/master 8ff9c003f -> 52ad6f12a


This closes #3861


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52ad6f12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52ad6f12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52ad6f12

Branch: refs/heads/master
Commit: 52ad6f12a95219f5318bcb7230068384b22c9d7a
Parents: 8ff9c00 2f0a53d
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 16 19:43:54 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Oct 16 19:43:54 2017 -0700

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml |   8 +-
 .../sdk/transforms/ApproximateQuantiles.java    |  10 +-
 .../transforms/ApproximateQuantilesTest.java    | 528 +++++++++++--------
 3 files changed, 326 insertions(+), 220 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: [BEAM-409] Avoiding integer division in ceil

Posted by tg...@apache.org.
[BEAM-409] Avoiding integer division in ceil

Casting the denominator in this division in the ceil call to a float to
avoid an incorrect integer division causing a bug.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f0a53d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f0a53d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f0a53d7

Branch: refs/heads/master
Commit: 2f0a53d7519c3ad556901cf99aad0bbf2b206f3e
Parents: 8ff9c00
Author: Daniel Oliveira <da...@gmail.com>
Authored: Fri Sep 15 15:32:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Oct 16 19:43:54 2017 -0700

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml |   8 +-
 .../sdk/transforms/ApproximateQuantiles.java    |  10 +-
 .../transforms/ApproximateQuantilesTest.java    | 528 +++++++++++--------
 3 files changed, 326 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index e54cd0b..bf10571 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -314,13 +314,7 @@
     <Class name="org.apache.beam.sdk.testing.WindowSupplier"/>
     <Field name="windows"/>
     <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--[BEAM-409] Inconsistent synchronization -->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.transforms.ApproximateQuantiles$ApproximateQuantilesCombineFn"/>
-    <Method name="create"/>
-    <Bug pattern="ICAST_INT_CAST_TO_DOUBLE_PASSED_TO_CEIL"/>
-    <!--[BEAM-409] Integral value cast to double and then passed to Math.ceil-->
+    <!--[BEAM-407] Inconsistent synchronization -->
   </Match>
   <Match>
     <Class name="org.apache.beam.sdk.transforms.ApproximateQuantiles$QuantileBuffer"/>

http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index e952af2..ff37024 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -341,7 +341,7 @@ public class ApproximateQuantiles {
         b++;
       }
       b--;
-      int k = Math.max(2, (int) Math.ceil(maxNumElements / (1 << (b - 1))));
+      int k = Math.max(2, (int) Math.ceil(maxNumElements / (float) (1 << (b - 1))));
       return new ApproximateQuantilesCombineFn<T, ComparatorT>(
           numQuantiles, compareFn, k, b, maxNumElements);
     }
@@ -366,6 +366,14 @@ public class ApproximateQuantiles {
           .add(DisplayData.item("comparer", compareFn.getClass())
             .withLabel("Record Comparer"));
     }
+
+    int getNumBuffers() {
+      return numBuffers;
+    }
+
+    int getBufferSize() {
+      return bufferSize;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index e180833..2657e07 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -21,10 +21,13 @@ import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
@@ -41,270 +44,371 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link ApproximateQuantiles}.
  */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
 public class ApproximateQuantilesTest {
 
-  static final List<KV<String, Integer>> TABLE = Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 2),
-      KV.of("a", 3),
-      KV.of("b", 1),
-      KV.of("b", 10),
-      KV.of("b", 10),
-      KV.of("b", 100)
-  );
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  public PCollection<KV<String, Integer>> createInputTable(Pipeline p) {
-    return p.apply(Create.of(TABLE).withCoder(
-        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-  }
+  /** Tests for the overall combiner behavior. */
+  @RunWith(JUnit4.class)
+  public static class CombinerTests {
+    static final List<KV<String, Integer>> TABLE = Arrays.asList(
+        KV.of("a", 1),
+        KV.of("a", 2),
+        KV.of("a", 3),
+        KV.of("b", 1),
+        KV.of("b", 10),
+        KV.of("b", 10),
+        KV.of("b", 100)
+    );
+
+    @Rule
+    public TestPipeline p = TestPipeline.create();
+
+    public PCollection<KV<String, Integer>> createInputTable(Pipeline p) {
+      return p.apply(Create.of(TABLE).withCoder(
+          KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testQuantilesGlobally() {
-    PCollection<Integer> input = intRangeCollection(p, 101);
-    PCollection<List<Integer>> quantiles =
-        input.apply(ApproximateQuantiles.<Integer>globally(5));
+    @Test
+    @Category(NeedsRunner.class)
+    public void testQuantilesGlobally() {
+      PCollection<Integer> input = intRangeCollection(p, 101);
+      PCollection<List<Integer>> quantiles =
+          input.apply(ApproximateQuantiles.<Integer>globally(5));
 
-    PAssert.that(quantiles)
-        .containsInAnyOrder(Arrays.asList(0, 25, 50, 75, 100));
-    p.run();
-  }
+      PAssert.that(quantiles)
+          .containsInAnyOrder(Arrays.asList(0, 25, 50, 75, 100));
+      p.run();
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testQuantilesGobally_comparable() {
-    PCollection<Integer> input = intRangeCollection(p, 101);
-    PCollection<List<Integer>> quantiles =
-        input.apply(
-            ApproximateQuantiles.globally(5, new DescendingIntComparator()));
-
-    PAssert.that(quantiles)
-        .containsInAnyOrder(Arrays.asList(100, 75, 50, 25, 0));
-    p.run();
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testQuantilesGobally_comparable() {
+      PCollection<Integer> input = intRangeCollection(p, 101);
+      PCollection<List<Integer>> quantiles =
+          input.apply(
+              ApproximateQuantiles.globally(5, new DescendingIntComparator()));
+
+      PAssert.that(quantiles)
+          .containsInAnyOrder(Arrays.asList(100, 75, 50, 25, 0));
+      p.run();
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testQuantilesPerKey() {
-    PCollection<KV<String, Integer>> input = createInputTable(p);
-    PCollection<KV<String, List<Integer>>> quantiles = input.apply(
-        ApproximateQuantiles.<String, Integer>perKey(2));
+    @Test
+    @Category(NeedsRunner.class)
+    public void testQuantilesPerKey() {
+      PCollection<KV<String, Integer>> input = createInputTable(p);
+      PCollection<KV<String, List<Integer>>> quantiles = input.apply(
+          ApproximateQuantiles.<String, Integer>perKey(2));
 
-    PAssert.that(quantiles)
-        .containsInAnyOrder(
-            KV.of("a", Arrays.asList(1, 3)),
-            KV.of("b", Arrays.asList(1, 100)));
-    p.run();
+      PAssert.that(quantiles)
+          .containsInAnyOrder(
+              KV.of("a", Arrays.asList(1, 3)),
+              KV.of("b", Arrays.asList(1, 100)));
+      p.run();
 
-  }
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testQuantilesPerKey_reversed() {
-    PCollection<KV<String, Integer>> input = createInputTable(p);
-    PCollection<KV<String, List<Integer>>> quantiles = input.apply(
-        ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey(
-            2, new DescendingIntComparator()));
-
-    PAssert.that(quantiles)
-        .containsInAnyOrder(
-            KV.of("a", Arrays.asList(3, 1)),
-            KV.of("b", Arrays.asList(100, 1)));
-    p.run();
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testQuantilesPerKey_reversed() {
+      PCollection<KV<String, Integer>> input = createInputTable(p);
+      PCollection<KV<String, List<Integer>>> quantiles = input.apply(
+          ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey(
+              2, new DescendingIntComparator()));
+
+      PAssert.that(quantiles)
+          .containsInAnyOrder(
+              KV.of("a", Arrays.asList(3, 1)),
+              KV.of("b", Arrays.asList(100, 1)));
+      p.run();
+    }
 
-  @Test
-  public void testSingleton() {
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        Arrays.asList(389),
-        Arrays.asList(389, 389, 389, 389, 389));
-  }
+    @Test
+    public void testSingleton() {
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          Arrays.asList(389),
+          Arrays.asList(389, 389, 389, 389, 389));
+    }
 
-  @Test
-  public void testSimpleQuantiles() {
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        intRange(101),
-        Arrays.asList(0, 25, 50, 75, 100));
-  }
+    @Test
+    public void testSimpleQuantiles() {
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          intRange(101),
+          Arrays.asList(0, 25, 50, 75, 100));
+    }
 
-  @Test
-  public void testUnevenQuantiles() {
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(37),
-        intRange(5000),
-        quantileMatcher(5000, 37, 20 /* tolerance */));
-  }
+    @Test
+    public void testUnevenQuantiles() {
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(37),
+          intRange(5000),
+          quantileMatcher(5000, 37, 20 /* tolerance */));
+    }
 
-  @Test
-  public void testLargerQuantiles() {
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(50),
-        intRange(10001),
-        quantileMatcher(10001, 50, 20 /* tolerance */));
-  }
+    @Test
+    public void testLargerQuantiles() {
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(50),
+          intRange(10001),
+          quantileMatcher(10001, 50, 20 /* tolerance */));
+    }
 
-  @Test
-  public void testTightEpsilon() {
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01),
-        intRange(10001),
-        quantileMatcher(10001, 10, 5 /* tolerance */));
-  }
+    @Test
+    public void testTightEpsilon() {
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01),
+          intRange(10001),
+          quantileMatcher(10001, 10, 5 /* tolerance */));
+    }
 
-  @Test
-  public void testDuplicates() {
-    int size = 101;
-    List<Integer> all = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      all.addAll(intRange(size));
+    @Test
+    public void testDuplicates() {
+      int size = 101;
+      List<Integer> all = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        all.addAll(intRange(size));
+      }
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          all,
+          Arrays.asList(0, 25, 50, 75, 100));
     }
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        all,
-        Arrays.asList(0, 25, 50, 75, 100));
-  }
 
-  @Test
-  public void testLotsOfDuplicates() {
-    List<Integer> all = new ArrayList<>();
-    all.add(1);
-    for (int i = 1; i < 300; i++) {
-      all.add(2);
+    @Test
+    public void testLotsOfDuplicates() {
+      List<Integer> all = new ArrayList<>();
+      all.add(1);
+      for (int i = 1; i < 300; i++) {
+        all.add(2);
+      }
+      for (int i = 300; i < 1000; i++) {
+        all.add(3);
+      }
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          all,
+          Arrays.asList(1, 2, 3, 3, 3));
     }
-    for (int i = 300; i < 1000; i++) {
-      all.add(3);
+
+    @Test
+    public void testLogDistribution() {
+      List<Integer> all = new ArrayList<>();
+      for (int i = 1; i < 1000; i++) {
+        all.add((int) Math.log(i));
+      }
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          all,
+          Arrays.asList(0, 5, 6, 6, 6));
     }
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        all,
-        Arrays.asList(1, 2, 3, 3, 3));
-  }
 
-  @Test
-  public void testLogDistribution() {
-    List<Integer> all = new ArrayList<>();
-    for (int i = 1; i < 1000; i++) {
-      all.add((int) Math.log(i));
+    @Test
+    public void testZipfianDistribution() {
+      List<Integer> all = new ArrayList<>();
+      for (int i = 1; i < 1000; i++) {
+        all.add(1000 / i);
+      }
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<Integer>create(5),
+          all,
+          Arrays.asList(1, 1, 2, 4, 1000));
     }
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        all,
-        Arrays.asList(0, 5, 6, 6, 6));
-  }
 
-  @Test
-  public void testZipfianDistribution() {
-    List<Integer> all = new ArrayList<>();
-    for (int i = 1; i < 1000; i++) {
-      all.add(1000 / i);
+    @Test
+    public void testAlternateComparator() {
+      List<String> inputs = Arrays.asList(
+          "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz");
+      testCombineFn(
+          ApproximateQuantilesCombineFn.<String>create(3),
+          inputs,
+          Arrays.asList("aa", "b", "zz"));
+      testCombineFn(
+          ApproximateQuantilesCombineFn.create(3, new OrderByLength()),
+          inputs,
+          Arrays.asList("b", "aaa", "ccccc"));
     }
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<Integer>create(5),
-        all,
-        Arrays.asList(1, 1, 2, 4, 1000));
-  }
 
-  @Test
-  public void testAlternateComparator() {
-    List<String> inputs = Arrays.asList(
-        "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz");
-    testCombineFn(
-        ApproximateQuantilesCombineFn.<String>create(3),
-        inputs,
-        Arrays.asList("aa", "b", "zz"));
-    testCombineFn(
-        ApproximateQuantilesCombineFn.create(3, new OrderByLength()),
-        inputs,
-        Arrays.asList("b", "aaa", "ccccc"));
-  }
+    @Test
+    public void testDisplayData() {
+      Top.Natural<Integer> comparer = new Top.Natural<Integer>();
+      PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
+      DisplayData displayData = DisplayData.from(approxQuanitiles);
 
-  @Test
-  public void testDisplayData() {
-    Top.Natural<Integer> comparer = new Top.Natural<Integer>();
-    PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
-    DisplayData displayData = DisplayData.from(approxQuanitiles);
+      assertThat(displayData, hasDisplayItem("numQuantiles", 20));
+      assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+    }
 
-    assertThat(displayData, hasDisplayItem("numQuantiles", 20));
-    assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
-  }
+    private Matcher<Iterable<? extends Integer>> quantileMatcher(
+        int size, int numQuantiles, int absoluteError) {
+      List<Matcher<? super Integer>> quantiles = new ArrayList<>();
+      quantiles.add(CoreMatchers.is(0));
+      for (int k = 1; k < numQuantiles - 1; k++) {
+        int expected = (int) (((double) (size - 1)) * k / (numQuantiles - 1));
+        quantiles.add(new Between<>(
+            expected - absoluteError, expected + absoluteError));
+      }
+      quantiles.add(CoreMatchers.is(size - 1));
+      return contains(quantiles);
+    }
+
+    private static class Between<T extends Comparable<T>>
+        extends TypeSafeDiagnosingMatcher<T> {
+      private final T min;
+      private final T max;
+
+      private Between(T min, T max) {
+        this.min = min;
+        this.max = max;
+      }
 
-  private Matcher<Iterable<? extends Integer>> quantileMatcher(
-      int size, int numQuantiles, int absoluteError) {
-    List<Matcher<? super Integer>> quantiles = new ArrayList<>();
-    quantiles.add(CoreMatchers.is(0));
-    for (int k = 1; k < numQuantiles - 1; k++) {
-      int expected = (int) (((double) (size - 1)) * k / (numQuantiles - 1));
-      quantiles.add(new Between<>(
-          expected - absoluteError, expected + absoluteError));
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("is between " + min + " and " + max);
+      }
+
+      @Override
+      protected boolean matchesSafely(T item, Description mismatchDescription) {
+        return min.compareTo(item) <= 0 && item.compareTo(max) <= 0;
+      }
     }
-    quantiles.add(CoreMatchers.is(size - 1));
-    return contains(quantiles);
-  }
 
-  private static class Between<T extends Comparable<T>>
-      extends TypeSafeDiagnosingMatcher<T> {
-    private final T min;
-    private final T max;
-    private Between(T min, T max) {
-      this.min = min;
-      this.max = max;
+    private static class DescendingIntComparator implements
+        SerializableComparator<Integer> {
+      @Override
+      public int compare(Integer o1, Integer o2) {
+        return o2.compareTo(o1);
+      }
     }
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("is between " + min + " and " + max);
+
+    private static class OrderByLength implements Comparator<String>, Serializable {
+      @Override
+      public int compare(String a, String b) {
+        if (a.length() != b.length()) {
+          return a.length() - b.length();
+        } else {
+          return a.compareTo(b);
+        }
+      }
     }
 
-    @Override
-    protected boolean matchesSafely(T item, Description mismatchDescription) {
-      return min.compareTo(item) <= 0 && item.compareTo(max) <= 0;
+
+    private PCollection<Integer> intRangeCollection(Pipeline p, int size) {
+      return p.apply("CreateIntsUpTo(" + size + ")", Create.of(intRange(size)));
     }
-  }
 
-  private static class DescendingIntComparator implements
-      SerializableComparator<Integer> {
-    @Override
-    public int compare(Integer o1, Integer o2) {
-      return o2.compareTo(o1);
+    private List<Integer> intRange(int size) {
+      List<Integer> all = new ArrayList<>(size);
+      for (int i = 0; i < size; i++) {
+        all.add(i);
+      }
+      return all;
     }
   }
 
-  private static class OrderByLength implements Comparator<String>, Serializable {
-    @Override
-    public int compare(String a, String b) {
-      if (a.length() != b.length()) {
-        return a.length() - b.length();
-      } else {
-        return a.compareTo(b);
+  /** Tests to ensure we are calculating the optimal buffers. */
+  @RunWith(Parameterized.class)
+  public static class BufferTests {
+
+    private final double epsilon;
+    private final long maxInputSize;
+    private final int expectedNumBuffers;
+    private final int expectedBufferSize;
+    private final ApproximateQuantilesCombineFn<?, ?> combineFn;
+
+    /**
+     * Test data taken from "Munro-Paterson Algorithm" reference values table of "Approximate
+     * Medians and other Quantiles in One Pass and with Limited Memory" paper.
+     *
+     * @see ApproximateQuantilesCombineFn for paper reference.
+     */
+    private static final double[] epsilons = new double[]{0.1, 0.05, 0.01, 0.005, 0.001};
+    private static final int[] maxElementExponents = new int[]{5, 6, 7, 8, 9};
+
+    private static final int[][] expectedNumBuffersValues = new int[][]{
+        {11, 14, 17, 21, 24},
+        {11, 14, 17, 20, 23},
+        {9, 11, 14, 17, 21},
+        {8, 11, 14, 17, 20},
+        {6, 9, 11, 14, 17},
+    };
+
+    private static final int[][] expectedBufferSizeValues = new int[][]{
+        {98, 123, 153, 96, 120},
+        {98, 123, 153, 191, 239},
+        {391, 977, 1221, 1526, 954},
+        {782, 977, 1221, 1526, 1908},
+        {3125, 3907, 9766, 12208, 15259},
+    };
+
+    @Parameterized.Parameters(name = "{index}: epsilon = {0}, maxInputSize = {1}")
+    public static Collection<Object[]> data() {
+      Collection<Object[]> testData = Lists.newArrayList();
+      for (int i = 0; i < epsilons.length; i++) {
+        for (int j = 0; j < maxElementExponents.length; j++) {
+          testData.add(new Object[]{
+              epsilons[i],
+              (long) Math.pow(10, maxElementExponents[j]),
+              expectedNumBuffersValues[i][j],
+              expectedBufferSizeValues[i][j]
+          });
+        }
       }
+
+      return testData;
     }
-  }
 
+    public BufferTests(
+        Double epsilon, Long maxInputSize, Integer expectedNumBuffers, Integer expectedBufferSize) {
+      this.epsilon = epsilon;
+      this.maxInputSize = maxInputSize;
+      this.expectedNumBuffers = expectedNumBuffers;
+      this.expectedBufferSize = expectedBufferSize;
 
-  private PCollection<Integer> intRangeCollection(Pipeline p, int size) {
-    return p.apply("CreateIntsUpTo(" + size + ")", Create.of(intRange(size)));
-  }
+      this.combineFn = ApproximateQuantilesCombineFn.create(
+          10, new Top.Natural<Long>(), maxInputSize, epsilon);
+    }
+
+    /**
+     * Verify the buffers are efficiently calculated according to the reference table values.
+     */
+    @Test
+    public void testEfficiency() {
+      assertEquals("Number of buffers", expectedNumBuffers, combineFn.getNumBuffers());
+      assertEquals("Buffer size", expectedBufferSize, combineFn.getBufferSize());
+    }
 
-  private List<Integer> intRange(int size) {
-    List<Integer> all = new ArrayList<>(size);
-    for (int i = 0; i < size; i++) {
-      all.add(i);
+    /**
+     * Verify that buffers are correct according to the two constraint equations.
+     */
+    @Test
+    public void testCorrectness() {
+      int b = combineFn.getNumBuffers();
+      int k = combineFn.getBufferSize();
+      long n = this.maxInputSize;
+
+      assertThat(
+          "(b-2)2^(b-2) + 1/2 <= eN",
+          (b - 2) * (1 << (b - 2)) + 0.5,
+          Matchers.lessThanOrEqualTo(this.epsilon * n));
+      assertThat(
+          "k2^(b-1) >= N",
+          Math.pow(k * 2, b - 1),
+          Matchers.greaterThanOrEqualTo((double) n));
     }
-    return all;
   }
 }