You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/17 02:44:06 UTC
[2/2] beam git commit: [BEAM-409] Avoiding integer division in ceil
[BEAM-409] Avoiding integer division in ceil
Casting the denominator in this division in the ceil call to a float to
avoid an incorrect integer division causing a bug.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f0a53d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f0a53d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f0a53d7
Branch: refs/heads/master
Commit: 2f0a53d7519c3ad556901cf99aad0bbf2b206f3e
Parents: 8ff9c00
Author: Daniel Oliveira <da...@gmail.com>
Authored: Fri Sep 15 15:32:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Oct 16 19:43:54 2017 -0700
----------------------------------------------------------------------
.../src/main/resources/beam/findbugs-filter.xml | 8 +-
.../sdk/transforms/ApproximateQuantiles.java | 10 +-
.../transforms/ApproximateQuantilesTest.java | 528 +++++++++++--------
3 files changed, 326 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index e54cd0b..bf10571 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -314,13 +314,7 @@
<Class name="org.apache.beam.sdk.testing.WindowSupplier"/>
<Field name="windows"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--[BEAM-409] Inconsistent synchronization -->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.transforms.ApproximateQuantiles$ApproximateQuantilesCombineFn"/>
- <Method name="create"/>
- <Bug pattern="ICAST_INT_CAST_TO_DOUBLE_PASSED_TO_CEIL"/>
- <!--[BEAM-409] Integral value cast to double and then passed to Math.ceil-->
+ <!--[BEAM-407] Inconsistent synchronization -->
</Match>
<Match>
<Class name="org.apache.beam.sdk.transforms.ApproximateQuantiles$QuantileBuffer"/>
http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index e952af2..ff37024 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -341,7 +341,7 @@ public class ApproximateQuantiles {
b++;
}
b--;
- int k = Math.max(2, (int) Math.ceil(maxNumElements / (1 << (b - 1))));
+ int k = Math.max(2, (int) Math.ceil(maxNumElements / (float) (1 << (b - 1))));
return new ApproximateQuantilesCombineFn<T, ComparatorT>(
numQuantiles, compareFn, k, b, maxNumElements);
}
@@ -366,6 +366,14 @@ public class ApproximateQuantiles {
.add(DisplayData.item("comparer", compareFn.getClass())
.withLabel("Record Comparer"));
}
+
+ int getNumBuffers() {
+ return numBuffers;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/2f0a53d7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index e180833..2657e07 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -21,10 +21,13 @@ import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
@@ -41,270 +44,371 @@ import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
/**
* Tests for {@link ApproximateQuantiles}.
*/
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
public class ApproximateQuantilesTest {
- static final List<KV<String, Integer>> TABLE = Arrays.asList(
- KV.of("a", 1),
- KV.of("a", 2),
- KV.of("a", 3),
- KV.of("b", 1),
- KV.of("b", 10),
- KV.of("b", 10),
- KV.of("b", 100)
- );
-
- @Rule
- public TestPipeline p = TestPipeline.create();
-
- public PCollection<KV<String, Integer>> createInputTable(Pipeline p) {
- return p.apply(Create.of(TABLE).withCoder(
- KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
- }
+ /** Tests for the overall combiner behavior. */
+ @RunWith(JUnit4.class)
+ public static class CombinerTests {
+ static final List<KV<String, Integer>> TABLE = Arrays.asList(
+ KV.of("a", 1),
+ KV.of("a", 2),
+ KV.of("a", 3),
+ KV.of("b", 1),
+ KV.of("b", 10),
+ KV.of("b", 10),
+ KV.of("b", 100)
+ );
+
+ @Rule
+ public TestPipeline p = TestPipeline.create();
+
+ public PCollection<KV<String, Integer>> createInputTable(Pipeline p) {
+ return p.apply(Create.of(TABLE).withCoder(
+ KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+ }
- @Test
- @Category(NeedsRunner.class)
- public void testQuantilesGlobally() {
- PCollection<Integer> input = intRangeCollection(p, 101);
- PCollection<List<Integer>> quantiles =
- input.apply(ApproximateQuantiles.<Integer>globally(5));
+ @Test
+ @Category(NeedsRunner.class)
+ public void testQuantilesGlobally() {
+ PCollection<Integer> input = intRangeCollection(p, 101);
+ PCollection<List<Integer>> quantiles =
+ input.apply(ApproximateQuantiles.<Integer>globally(5));
- PAssert.that(quantiles)
- .containsInAnyOrder(Arrays.asList(0, 25, 50, 75, 100));
- p.run();
- }
+ PAssert.that(quantiles)
+ .containsInAnyOrder(Arrays.asList(0, 25, 50, 75, 100));
+ p.run();
+ }
- @Test
- @Category(NeedsRunner.class)
- public void testQuantilesGobally_comparable() {
- PCollection<Integer> input = intRangeCollection(p, 101);
- PCollection<List<Integer>> quantiles =
- input.apply(
- ApproximateQuantiles.globally(5, new DescendingIntComparator()));
-
- PAssert.that(quantiles)
- .containsInAnyOrder(Arrays.asList(100, 75, 50, 25, 0));
- p.run();
- }
+ @Test
+ @Category(NeedsRunner.class)
+ public void testQuantilesGobally_comparable() {
+ PCollection<Integer> input = intRangeCollection(p, 101);
+ PCollection<List<Integer>> quantiles =
+ input.apply(
+ ApproximateQuantiles.globally(5, new DescendingIntComparator()));
+
+ PAssert.that(quantiles)
+ .containsInAnyOrder(Arrays.asList(100, 75, 50, 25, 0));
+ p.run();
+ }
- @Test
- @Category(NeedsRunner.class)
- public void testQuantilesPerKey() {
- PCollection<KV<String, Integer>> input = createInputTable(p);
- PCollection<KV<String, List<Integer>>> quantiles = input.apply(
- ApproximateQuantiles.<String, Integer>perKey(2));
+ @Test
+ @Category(NeedsRunner.class)
+ public void testQuantilesPerKey() {
+ PCollection<KV<String, Integer>> input = createInputTable(p);
+ PCollection<KV<String, List<Integer>>> quantiles = input.apply(
+ ApproximateQuantiles.<String, Integer>perKey(2));
- PAssert.that(quantiles)
- .containsInAnyOrder(
- KV.of("a", Arrays.asList(1, 3)),
- KV.of("b", Arrays.asList(1, 100)));
- p.run();
+ PAssert.that(quantiles)
+ .containsInAnyOrder(
+ KV.of("a", Arrays.asList(1, 3)),
+ KV.of("b", Arrays.asList(1, 100)));
+ p.run();
- }
+ }
- @Test
- @Category(NeedsRunner.class)
- public void testQuantilesPerKey_reversed() {
- PCollection<KV<String, Integer>> input = createInputTable(p);
- PCollection<KV<String, List<Integer>>> quantiles = input.apply(
- ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey(
- 2, new DescendingIntComparator()));
-
- PAssert.that(quantiles)
- .containsInAnyOrder(
- KV.of("a", Arrays.asList(3, 1)),
- KV.of("b", Arrays.asList(100, 1)));
- p.run();
- }
+ @Test
+ @Category(NeedsRunner.class)
+ public void testQuantilesPerKey_reversed() {
+ PCollection<KV<String, Integer>> input = createInputTable(p);
+ PCollection<KV<String, List<Integer>>> quantiles = input.apply(
+ ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey(
+ 2, new DescendingIntComparator()));
+
+ PAssert.that(quantiles)
+ .containsInAnyOrder(
+ KV.of("a", Arrays.asList(3, 1)),
+ KV.of("b", Arrays.asList(100, 1)));
+ p.run();
+ }
- @Test
- public void testSingleton() {
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- Arrays.asList(389),
- Arrays.asList(389, 389, 389, 389, 389));
- }
+ @Test
+ public void testSingleton() {
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ Arrays.asList(389),
+ Arrays.asList(389, 389, 389, 389, 389));
+ }
- @Test
- public void testSimpleQuantiles() {
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- intRange(101),
- Arrays.asList(0, 25, 50, 75, 100));
- }
+ @Test
+ public void testSimpleQuantiles() {
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ intRange(101),
+ Arrays.asList(0, 25, 50, 75, 100));
+ }
- @Test
- public void testUnevenQuantiles() {
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(37),
- intRange(5000),
- quantileMatcher(5000, 37, 20 /* tolerance */));
- }
+ @Test
+ public void testUnevenQuantiles() {
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(37),
+ intRange(5000),
+ quantileMatcher(5000, 37, 20 /* tolerance */));
+ }
- @Test
- public void testLargerQuantiles() {
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(50),
- intRange(10001),
- quantileMatcher(10001, 50, 20 /* tolerance */));
- }
+ @Test
+ public void testLargerQuantiles() {
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(50),
+ intRange(10001),
+ quantileMatcher(10001, 50, 20 /* tolerance */));
+ }
- @Test
- public void testTightEpsilon() {
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01),
- intRange(10001),
- quantileMatcher(10001, 10, 5 /* tolerance */));
- }
+ @Test
+ public void testTightEpsilon() {
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01),
+ intRange(10001),
+ quantileMatcher(10001, 10, 5 /* tolerance */));
+ }
- @Test
- public void testDuplicates() {
- int size = 101;
- List<Integer> all = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- all.addAll(intRange(size));
+ @Test
+ public void testDuplicates() {
+ int size = 101;
+ List<Integer> all = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ all.addAll(intRange(size));
+ }
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ all,
+ Arrays.asList(0, 25, 50, 75, 100));
}
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- all,
- Arrays.asList(0, 25, 50, 75, 100));
- }
- @Test
- public void testLotsOfDuplicates() {
- List<Integer> all = new ArrayList<>();
- all.add(1);
- for (int i = 1; i < 300; i++) {
- all.add(2);
+ @Test
+ public void testLotsOfDuplicates() {
+ List<Integer> all = new ArrayList<>();
+ all.add(1);
+ for (int i = 1; i < 300; i++) {
+ all.add(2);
+ }
+ for (int i = 300; i < 1000; i++) {
+ all.add(3);
+ }
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ all,
+ Arrays.asList(1, 2, 3, 3, 3));
}
- for (int i = 300; i < 1000; i++) {
- all.add(3);
+
+ @Test
+ public void testLogDistribution() {
+ List<Integer> all = new ArrayList<>();
+ for (int i = 1; i < 1000; i++) {
+ all.add((int) Math.log(i));
+ }
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ all,
+ Arrays.asList(0, 5, 6, 6, 6));
}
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- all,
- Arrays.asList(1, 2, 3, 3, 3));
- }
- @Test
- public void testLogDistribution() {
- List<Integer> all = new ArrayList<>();
- for (int i = 1; i < 1000; i++) {
- all.add((int) Math.log(i));
+ @Test
+ public void testZipfianDistribution() {
+ List<Integer> all = new ArrayList<>();
+ for (int i = 1; i < 1000; i++) {
+ all.add(1000 / i);
+ }
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<Integer>create(5),
+ all,
+ Arrays.asList(1, 1, 2, 4, 1000));
}
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- all,
- Arrays.asList(0, 5, 6, 6, 6));
- }
- @Test
- public void testZipfianDistribution() {
- List<Integer> all = new ArrayList<>();
- for (int i = 1; i < 1000; i++) {
- all.add(1000 / i);
+ @Test
+ public void testAlternateComparator() {
+ List<String> inputs = Arrays.asList(
+ "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz");
+ testCombineFn(
+ ApproximateQuantilesCombineFn.<String>create(3),
+ inputs,
+ Arrays.asList("aa", "b", "zz"));
+ testCombineFn(
+ ApproximateQuantilesCombineFn.create(3, new OrderByLength()),
+ inputs,
+ Arrays.asList("b", "aaa", "ccccc"));
}
- testCombineFn(
- ApproximateQuantilesCombineFn.<Integer>create(5),
- all,
- Arrays.asList(1, 1, 2, 4, 1000));
- }
- @Test
- public void testAlternateComparator() {
- List<String> inputs = Arrays.asList(
- "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz");
- testCombineFn(
- ApproximateQuantilesCombineFn.<String>create(3),
- inputs,
- Arrays.asList("aa", "b", "zz"));
- testCombineFn(
- ApproximateQuantilesCombineFn.create(3, new OrderByLength()),
- inputs,
- Arrays.asList("b", "aaa", "ccccc"));
- }
+ @Test
+ public void testDisplayData() {
+ Top.Natural<Integer> comparer = new Top.Natural<Integer>();
+ PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
+ DisplayData displayData = DisplayData.from(approxQuanitiles);
- @Test
- public void testDisplayData() {
- Top.Natural<Integer> comparer = new Top.Natural<Integer>();
- PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
- DisplayData displayData = DisplayData.from(approxQuanitiles);
+ assertThat(displayData, hasDisplayItem("numQuantiles", 20));
+ assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+ }
- assertThat(displayData, hasDisplayItem("numQuantiles", 20));
- assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
- }
+ private Matcher<Iterable<? extends Integer>> quantileMatcher(
+ int size, int numQuantiles, int absoluteError) {
+ List<Matcher<? super Integer>> quantiles = new ArrayList<>();
+ quantiles.add(CoreMatchers.is(0));
+ for (int k = 1; k < numQuantiles - 1; k++) {
+ int expected = (int) (((double) (size - 1)) * k / (numQuantiles - 1));
+ quantiles.add(new Between<>(
+ expected - absoluteError, expected + absoluteError));
+ }
+ quantiles.add(CoreMatchers.is(size - 1));
+ return contains(quantiles);
+ }
+
+ private static class Between<T extends Comparable<T>>
+ extends TypeSafeDiagnosingMatcher<T> {
+ private final T min;
+ private final T max;
+
+ private Between(T min, T max) {
+ this.min = min;
+ this.max = max;
+ }
- private Matcher<Iterable<? extends Integer>> quantileMatcher(
- int size, int numQuantiles, int absoluteError) {
- List<Matcher<? super Integer>> quantiles = new ArrayList<>();
- quantiles.add(CoreMatchers.is(0));
- for (int k = 1; k < numQuantiles - 1; k++) {
- int expected = (int) (((double) (size - 1)) * k / (numQuantiles - 1));
- quantiles.add(new Between<>(
- expected - absoluteError, expected + absoluteError));
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("is between " + min + " and " + max);
+ }
+
+ @Override
+ protected boolean matchesSafely(T item, Description mismatchDescription) {
+ return min.compareTo(item) <= 0 && item.compareTo(max) <= 0;
+ }
}
- quantiles.add(CoreMatchers.is(size - 1));
- return contains(quantiles);
- }
- private static class Between<T extends Comparable<T>>
- extends TypeSafeDiagnosingMatcher<T> {
- private final T min;
- private final T max;
- private Between(T min, T max) {
- this.min = min;
- this.max = max;
+ private static class DescendingIntComparator implements
+ SerializableComparator<Integer> {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o2.compareTo(o1);
+ }
}
- @Override
- public void describeTo(Description description) {
- description.appendText("is between " + min + " and " + max);
+
+ private static class OrderByLength implements Comparator<String>, Serializable {
+ @Override
+ public int compare(String a, String b) {
+ if (a.length() != b.length()) {
+ return a.length() - b.length();
+ } else {
+ return a.compareTo(b);
+ }
+ }
}
- @Override
- protected boolean matchesSafely(T item, Description mismatchDescription) {
- return min.compareTo(item) <= 0 && item.compareTo(max) <= 0;
+
+ private PCollection<Integer> intRangeCollection(Pipeline p, int size) {
+ return p.apply("CreateIntsUpTo(" + size + ")", Create.of(intRange(size)));
}
- }
- private static class DescendingIntComparator implements
- SerializableComparator<Integer> {
- @Override
- public int compare(Integer o1, Integer o2) {
- return o2.compareTo(o1);
+ private List<Integer> intRange(int size) {
+ List<Integer> all = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ all.add(i);
+ }
+ return all;
}
}
- private static class OrderByLength implements Comparator<String>, Serializable {
- @Override
- public int compare(String a, String b) {
- if (a.length() != b.length()) {
- return a.length() - b.length();
- } else {
- return a.compareTo(b);
+ /** Tests to ensure we are calculating the optimal buffers. */
+ @RunWith(Parameterized.class)
+ public static class BufferTests {
+
+ private final double epsilon;
+ private final long maxInputSize;
+ private final int expectedNumBuffers;
+ private final int expectedBufferSize;
+ private final ApproximateQuantilesCombineFn<?, ?> combineFn;
+
+ /**
+ * Test data taken from "Munro-Paterson Algorithm" reference values table of "Approximate
+ * Medians and other Quantiles in One Pass and with Limited Memory" paper.
+ *
+ * @see ApproximateQuantilesCombineFn for paper reference.
+ */
+ private static final double[] epsilons = new double[]{0.1, 0.05, 0.01, 0.005, 0.001};
+ private static final int[] maxElementExponents = new int[]{5, 6, 7, 8, 9};
+
+ private static final int[][] expectedNumBuffersValues = new int[][]{
+ {11, 14, 17, 21, 24},
+ {11, 14, 17, 20, 23},
+ {9, 11, 14, 17, 21},
+ {8, 11, 14, 17, 20},
+ {6, 9, 11, 14, 17},
+ };
+
+ private static final int[][] expectedBufferSizeValues = new int[][]{
+ {98, 123, 153, 96, 120},
+ {98, 123, 153, 191, 239},
+ {391, 977, 1221, 1526, 954},
+ {782, 977, 1221, 1526, 1908},
+ {3125, 3907, 9766, 12208, 15259},
+ };
+
+ @Parameterized.Parameters(name = "{index}: epsilon = {0}, maxInputSize = {1}")
+ public static Collection<Object[]> data() {
+ Collection<Object[]> testData = Lists.newArrayList();
+ for (int i = 0; i < epsilons.length; i++) {
+ for (int j = 0; j < maxElementExponents.length; j++) {
+ testData.add(new Object[]{
+ epsilons[i],
+ (long) Math.pow(10, maxElementExponents[j]),
+ expectedNumBuffersValues[i][j],
+ expectedBufferSizeValues[i][j]
+ });
+ }
}
+
+ return testData;
}
- }
+ public BufferTests(
+ Double epsilon, Long maxInputSize, Integer expectedNumBuffers, Integer expectedBufferSize) {
+ this.epsilon = epsilon;
+ this.maxInputSize = maxInputSize;
+ this.expectedNumBuffers = expectedNumBuffers;
+ this.expectedBufferSize = expectedBufferSize;
- private PCollection<Integer> intRangeCollection(Pipeline p, int size) {
- return p.apply("CreateIntsUpTo(" + size + ")", Create.of(intRange(size)));
- }
+ this.combineFn = ApproximateQuantilesCombineFn.create(
+ 10, new Top.Natural<Long>(), maxInputSize, epsilon);
+ }
+
+ /**
+ * Verify the buffers are efficiently calculated according to the reference table values.
+ */
+ @Test
+ public void testEfficiency() {
+ assertEquals("Number of buffers", expectedNumBuffers, combineFn.getNumBuffers());
+ assertEquals("Buffer size", expectedBufferSize, combineFn.getBufferSize());
+ }
- private List<Integer> intRange(int size) {
- List<Integer> all = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- all.add(i);
+ /**
+ * Verify that buffers are correct according to the two constraint equations.
+ */
+ @Test
+ public void testCorrectness() {
+ int b = combineFn.getNumBuffers();
+ int k = combineFn.getBufferSize();
+ long n = this.maxInputSize;
+
+ assertThat(
+ "(b-2)2^(b-2) + 1/2 <= eN",
+ (b - 2) * (1 << (b - 2)) + 0.5,
+ Matchers.lessThanOrEqualTo(this.epsilon * n));
+ assertThat(
+ "k2^(b-1) >= N",
+ Math.pow(k * 2, b - 1),
+ Matchers.greaterThanOrEqualTo((double) n));
}
- return all;
}
}