You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/28 19:41:36 UTC
[3/8] beam git commit: [BEAM-1186] Broke ApproximateUniqueTest into 3
test classes that support TestPipeline as a JUnit rule.
[BEAM-1186] Broke ApproximateUniqueTest into 3 test classes that support TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3aaa1e3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3aaa1e3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3aaa1e3f
Branch: refs/heads/master
Commit: 3aaa1e3f4dfa51e595ecccae7f9afe40eb95a664
Parents: b538574
Author: Stas Levin <st...@gmail.com>
Authored: Wed Dec 21 23:00:39 2016 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 11:40:32 2016 -0800
----------------------------------------------------------------------
.../sdk/transforms/ApproximateUniqueTest.java | 485 +++++++++++--------
1 file changed, 283 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3aaa1e3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 3afc759..77bbcfa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -25,12 +25,12 @@ import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -45,270 +45,351 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Suite;
/**
* Tests for the ApproximateUnique aggregator transform.
*/
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ ApproximateUniqueTest.ApproximateUniqueWithDuplicatesTest.class,
+ ApproximateUniqueTest.ApproximateUniqueVariationsTest.class,
+ ApproximateUniqueTest.ApproximateUniqueMiscTest.class
+})
public class ApproximateUniqueTest implements Serializable {
// implements Serializable just to make it easy to use anonymous inner DoFn subclasses
@Rule
public final transient TestPipeline p = TestPipeline.create();
- @Test
- public void testEstimationErrorToSampleSize() {
- assertEquals(40000, ApproximateUnique.sampleSizeFromEstimationError(0.01));
- assertEquals(10000, ApproximateUnique.sampleSizeFromEstimationError(0.02));
- assertEquals(2500, ApproximateUnique.sampleSizeFromEstimationError(0.04));
- assertEquals(1600, ApproximateUnique.sampleSizeFromEstimationError(0.05));
- assertEquals(400, ApproximateUnique.sampleSizeFromEstimationError(0.1));
- assertEquals(100, ApproximateUnique.sampleSizeFromEstimationError(0.2));
- assertEquals(25, ApproximateUnique.sampleSizeFromEstimationError(0.4));
- assertEquals(16, ApproximateUnique.sampleSizeFromEstimationError(0.5));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testApproximateUniqueWithSmallInput() {
- PCollection<Integer> input = p.apply(
- Create.of(Arrays.asList(1, 2, 3, 3)));
-
- PCollection<Long> estimate = input
- .apply(ApproximateUnique.<Integer>globally(1000));
-
- PAssert.thatSingleton(estimate).isEqualTo(3L);
+ private static class VerifyEstimateFn implements SerializableFunction<Long, Void> {
+ private final long uniqueCount;
+ private final int sampleSize;
- p.run();
- }
+ private VerifyEstimateFn(final long uniqueCount, final int sampleSize) {
+ this.uniqueCount = uniqueCount;
+ this.sampleSize = sampleSize;
+ }
- @Test
- @Category(NeedsRunner.class)
- public void testApproximateUniqueWithDuplicates() {
- runApproximateUniqueWithDuplicates(100, 100, 100);
- runApproximateUniqueWithDuplicates(1000, 1000, 100);
- runApproximateUniqueWithDuplicates(1500, 1000, 100);
- runApproximateUniqueWithDuplicates(10000, 1000, 100);
+ @Override
+ public Void apply(final Long estimate) {
+ verifyEstimate(uniqueCount, sampleSize, estimate);
+ return null;
+ }
}
- private void runApproximateUniqueWithDuplicates(int elementCount,
- int uniqueCount, int sampleSize) {
-
- assert elementCount >= uniqueCount;
- List<Double> elements = Lists.newArrayList();
- for (int i = 0; i < elementCount; i++) {
- elements.add(1.0 / (i % uniqueCount + 1));
+ /**
+ * Checks that the estimation error, i.e., the difference between
+ * {@code uniqueCount} and {@code estimate} is less than
+ * {@code 2 / sqrt(sampleSize}).
+ */
+ private static void verifyEstimate(final long uniqueCount,
+ final int sampleSize,
+ final long estimate) {
+ if (uniqueCount < sampleSize) {
+ assertEquals("Number of hashes is less than the sample size. "
+ + "Estimate should be exact", uniqueCount, estimate);
}
- Collections.shuffle(elements);
- Pipeline p = TestPipeline.create();
- PCollection<Double> input = p.apply(Create.of(elements));
- PCollection<Long> estimate =
- input.apply(ApproximateUnique.<Double>globally(sampleSize));
+ final double error = 100.0 * Math.abs(estimate - uniqueCount) / uniqueCount;
+ final double maxError = 100.0 * 2 / Math.sqrt(sampleSize);
- PAssert.thatSingleton(estimate).satisfies(new VerifyEstimateFn(uniqueCount, sampleSize));
+ assertTrue("Estimate= " + estimate + " Actual=" + uniqueCount + " Error="
+ + error + "%, MaxError=" + maxError + "%.", error < maxError);
- p.run();
+ assertTrue("Estimate= " + estimate + " Actual=" + uniqueCount + " Error="
+ + error + "%, MaxError=" + maxError + "%.", error < maxError);
}
- @Test
- @Category(NeedsRunner.class)
- public void testApproximateUniqueWithSkewedDistributions() {
- runApproximateUniqueWithSkewedDistributions(100, 100, 100);
- runApproximateUniqueWithSkewedDistributions(10000, 10000, 100);
- runApproximateUniqueWithSkewedDistributions(10000, 1000, 100);
- runApproximateUniqueWithSkewedDistributions(10000, 200, 100);
- }
+ private static class VerifyEstimatePerKeyFn
+ implements SerializableFunction<Iterable<KV<Long, Long>>, Void> {
- @Test
- @Category(NeedsRunner.class)
- public void testApproximateUniqueWithSkewedDistributionsAndLargeSampleSize() {
- runApproximateUniqueWithSkewedDistributions(10000, 2000, 1000);
- }
+ private final int sampleSize;
- private void runApproximateUniqueWithSkewedDistributions(int elementCount,
- final int uniqueCount, final int sampleSize) {
- List<Integer> elements = Lists.newArrayList();
- // Zipf distribution with approximately elementCount items.
- double s = 1 - 1.0 * uniqueCount / elementCount;
- double maxCount = Math.pow(uniqueCount, s);
- for (int k = 0; k < uniqueCount; k++) {
- int count = Math.max(1, (int) Math.round(maxCount * Math.pow(k, -s)));
- // Element k occurs count times.
- for (int c = 0; c < count; c++) {
- elements.add(k);
+ private VerifyEstimatePerKeyFn(final int sampleSize) {
+ this.sampleSize = sampleSize;
+ }
+
+ @Override
+ public Void apply(final Iterable<KV<Long, Long>> estimatePerKey) {
+ for (final KV<Long, Long> result : estimatePerKey) {
+ verifyEstimate(result.getKey(), sampleSize, result.getValue());
}
+ return null;
}
+ }
- Pipeline p = TestPipeline.create();
- PCollection<Integer> input = p.apply(Create.of(elements));
- PCollection<Long> estimate =
- input.apply(ApproximateUnique.<Integer>globally(sampleSize));
+ /**
+ * Tests for ApproximateUnique with duplicates.
+ */
+ @RunWith(Parameterized.class)
+ public static class ApproximateUniqueWithDuplicatesTest extends
+ ApproximateUniqueTest {
+
+ @Parameterized.Parameter
+ public int elementCount;
+ @Parameterized.Parameter(1)
+ public int uniqueCount;
+ @Parameterized.Parameter(2)
+ public int sampleSize;
+
+ @Parameterized.Parameters(name = "total_{0}_unique_{1}_sample_{2}")
+ public static Iterable<Object[]> data() throws IOException {
+ return ImmutableList.<Object[]>builder()
+ .add(
+ new Object[] {
+ 100, 100, 100
+ },
+ new Object[] {
+ 1000, 1000, 100
+ },
+ new Object[] {
+ 1500, 1000, 100
+ },
+ new Object[] {
+ 10000, 1000, 100
+ })
+ .build();
+ }
- PAssert.thatSingleton(estimate).satisfies(new VerifyEstimateFn(uniqueCount, sampleSize));
- p.run();
- }
+ private void runApproximateUniqueWithDuplicates(final int elementCount,
+ final int uniqueCount, final int sampleSize) {
- @Test
- @Category(NeedsRunner.class)
- public void testApproximateUniquePerKey() {
- List<KV<Long, Long>> elements = Lists.newArrayList();
- List<Long> keys = ImmutableList.of(20L, 50L, 100L);
- int elementCount = 1000;
- int sampleSize = 100;
- // Use the key as the number of unique values.
- for (long uniqueCount : keys) {
- for (long value = 0; value < elementCount; value++) {
- elements.add(KV.of(uniqueCount, value % uniqueCount));
+ assert elementCount >= uniqueCount;
+ final List<Double> elements = Lists.newArrayList();
+ for (int i = 0; i < elementCount; i++) {
+ elements.add(1.0 / (i % uniqueCount + 1));
}
- }
+ Collections.shuffle(elements);
- Pipeline p = TestPipeline.create();
- PCollection<KV<Long, Long>> input = p.apply(Create.of(elements));
- PCollection<KV<Long, Long>> counts =
- input.apply(ApproximateUnique.<Long, Long>perKey(sampleSize));
+ final PCollection<Double> input = p.apply(Create.of(elements));
+ final PCollection<Long> estimate =
+ input.apply(ApproximateUnique.<Double>globally(sampleSize));
- PAssert.that(counts).satisfies(new VerifyEstimatePerKeyFn(sampleSize));
+ PAssert.thatSingleton(estimate).satisfies(new VerifyEstimateFn(uniqueCount, sampleSize));
- p.run();
+ p.run();
+ }
- }
- /**
- * Applies {@link ApproximateUnique} for different sample sizes and verifies
- * that the estimation error falls within the maximum allowed error of
- * {@code 2 / sqrt(sampleSize)}.
- */
- @Test
- @Category(NeedsRunner.class)
- public void testApproximateUniqueWithDifferentSampleSizes() {
- runApproximateUniquePipeline(16);
- runApproximateUniquePipeline(64);
- runApproximateUniquePipeline(128);
- runApproximateUniquePipeline(256);
- runApproximateUniquePipeline(512);
- runApproximateUniquePipeline(1000);
- runApproximateUniquePipeline(1024);
- try {
- runApproximateUniquePipeline(15);
- fail("Accepted sampleSize < 16");
- } catch (IllegalArgumentException e) {
- assertTrue("Expected an exception due to sampleSize < 16", e.getMessage()
- .startsWith("ApproximateUnique needs a sampleSize >= 16"));
+ @Test
+ @Category(NeedsRunner.class)
+ public void testApproximateUniqueWithDuplicates() {
+ runApproximateUniqueWithDuplicates(elementCount, uniqueCount, sampleSize);
}
}
- @Test
- public void testApproximateUniqueGetName() {
- assertEquals("ApproximateUnique.PerKey", ApproximateUnique.<Long, Long>perKey(16).getName());
- assertEquals("ApproximateUnique.Globally", ApproximateUnique.<Integer>globally(16).getName());
- }
-
/**
- * Applies {@code ApproximateUnique(sampleSize)} verifying that the estimation
- * error falls within the maximum allowed error of {@code 2/sqrt(sampleSize)}.
+ * Tests for ApproximateUnique with different sample sizes.
*/
- private static void runApproximateUniquePipeline(int sampleSize) {
- Pipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of(TEST_LINES));
- PCollection<Long> approximate = input.apply(ApproximateUnique.<String>globally(sampleSize));
- final PCollectionView<Long> exact =
- input
- .apply(Distinct.<String>create())
- .apply(Count.<String>globally())
- .apply(View.<Long>asSingleton());
-
- PCollection<KV<Long, Long>> approximateAndExact = approximate
- .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), c.sideInput(exact)));
- }
- })
- .withSideInputs(exact));
-
- PAssert.that(approximateAndExact).satisfies(new VerifyEstimatePerKeyFn(sampleSize));
-
- p.run();
- }
+ @RunWith(Parameterized.class)
+ public static class ApproximateUniqueVariationsTest extends
+ ApproximateUniqueTest {
+
+ private static final int TEST_PAGES = 100;
+ private static final List<String> TEST_LINES =
+ new ArrayList<>(TEST_PAGES * TestUtils.LINES.size());
- private static final int TEST_PAGES = 100;
- private static final List<String> TEST_LINES =
- new ArrayList<>(TEST_PAGES * TestUtils.LINES.size());
+ static {
+ for (int i = 0; i < TEST_PAGES; i++) {
+ TEST_LINES.addAll(TestUtils.LINES);
+ }
+ }
- static {
- for (int i = 0; i < TEST_PAGES; i++) {
- TEST_LINES.addAll(TestUtils.LINES);
+ @Parameterized.Parameter
+ public int sampleSize;
+
+ @Parameterized.Parameters(name = "sampleSize_{0}")
+ public static Iterable<Object[]> data() throws IOException {
+ return ImmutableList.<Object[]>builder()
+ .add(new Object[] {
+ 16
+ },
+ new Object[] {
+ 64
+ },
+ new Object[] {
+ 128
+ },
+ new Object[] {
+ 256
+ },
+ new Object[] {
+ 512
+ },
+ new Object[] {
+ 1000
+ },
+ new Object[] {
+ 2014
+ },
+ new Object[] {
+ 15
+ })
+ .build();
+ }
+
+ /**
+ * Applies {@code ApproximateUnique(sampleSize)} verifying that the estimation
+ * error falls within the maximum allowed error of {@code 2/sqrt(sampleSize)}.
+ */
+ private void runApproximateUniquePipeline(final int sampleSize) {
+ final PCollection<String> input = p.apply(Create.of(TEST_LINES));
+ final PCollection<Long> approximate =
+ input.apply(ApproximateUnique.<String>globally(sampleSize));
+ final PCollectionView<Long> exact =
+ input
+ .apply(Distinct.<String>create())
+ .apply(Count.<String>globally())
+ .apply(View.<Long>asSingleton());
+
+ final PCollection<KV<Long, Long>> approximateAndExact = approximate
+ .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+
+ @ProcessElement
+ public void processElement(final ProcessContext c) {
+ c.output(KV.of(c.element(), c.sideInput(exact)));
+ }
+ }).withSideInputs(exact));
+
+ PAssert.that(approximateAndExact).satisfies(new VerifyEstimatePerKeyFn(sampleSize));
+
+ p.run();
+ }
+
+ /**
+ * Applies {@link ApproximateUnique} for different sample sizes and verifies
+ * that the estimation error falls within the maximum allowed error of
+ * {@code 2 / sqrt(sampleSize)}.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testApproximateUniqueWithDifferentSampleSizes() {
+ if (sampleSize > 16) {
+ runApproximateUniquePipeline(sampleSize);
+ } else {
+ try {
+ p.enableAbandonedNodeEnforcement(false);
+ runApproximateUniquePipeline(15);
+ fail("Accepted sampleSize < 16");
+ } catch (final IllegalArgumentException e) {
+ assertTrue("Expected an exception due to sampleSize < 16",
+ e.getMessage().startsWith("ApproximateUnique needs a sampleSize >= 16"));
+
+ }
+ }
}
}
/**
- * Checks that the estimation error, i.e., the difference between
- * {@code uniqueCount} and {@code estimate} is less than
- * {@code 2 / sqrt(sampleSize}).
+ * Further tests for ApproximateUnique.
*/
- private static void verifyEstimate(long uniqueCount, int sampleSize, long estimate) {
- if (uniqueCount < sampleSize) {
- assertEquals("Number of hashes is less than the sample size. "
- + "Estimate should be exact", uniqueCount, estimate);
+ @RunWith(JUnit4.class)
+ public static class ApproximateUniqueMiscTest extends ApproximateUniqueTest {
+
+ @Test
+ public void testEstimationErrorToSampleSize() {
+ assertEquals(40000, ApproximateUnique.sampleSizeFromEstimationError(0.01));
+ assertEquals(10000, ApproximateUnique.sampleSizeFromEstimationError(0.02));
+ assertEquals(2500, ApproximateUnique.sampleSizeFromEstimationError(0.04));
+ assertEquals(1600, ApproximateUnique.sampleSizeFromEstimationError(0.05));
+ assertEquals(400, ApproximateUnique.sampleSizeFromEstimationError(0.1));
+ assertEquals(100, ApproximateUnique.sampleSizeFromEstimationError(0.2));
+ assertEquals(25, ApproximateUnique.sampleSizeFromEstimationError(0.4));
+ assertEquals(16, ApproximateUnique.sampleSizeFromEstimationError(0.5));
}
- double error = 100.0 * Math.abs(estimate - uniqueCount) / uniqueCount;
- double maxError = 100.0 * 2 / Math.sqrt(sampleSize);
+ @Test
+ @Category(RunnableOnService.class)
+ public void testApproximateUniqueWithSmallInput() {
+ final PCollection<Integer> input = p.apply(
+ Create.of(Arrays.asList(1, 2, 3, 3)));
- assertTrue("Estimate= " + estimate + " Actual=" + uniqueCount + " Error="
- + error + "%, MaxError=" + maxError + "%.", error < maxError);
+ final PCollection<Long> estimate = input
+ .apply(ApproximateUnique.<Integer>globally(1000));
- assertTrue("Estimate= " + estimate + " Actual=" + uniqueCount + " Error="
- + error + "%, MaxError=" + maxError + "%.", error < maxError);
- }
-
- private static class VerifyEstimateFn implements SerializableFunction<Long, Void> {
- private long uniqueCount;
- private int sampleSize;
+ PAssert.thatSingleton(estimate).isEqualTo(3L);
- public VerifyEstimateFn(long uniqueCount, int sampleSize) {
- this.uniqueCount = uniqueCount;
- this.sampleSize = sampleSize;
+ p.run();
}
- @Override
- public Void apply(Long estimate) {
- verifyEstimate(uniqueCount, sampleSize, estimate);
- return null;
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testApproximateUniqueWithSkewedDistributionsAndLargeSampleSize() {
+ runApproximateUniqueWithSkewedDistributions(10000, 2000, 1000);
}
- }
- private static class VerifyEstimatePerKeyFn
- implements SerializableFunction<Iterable<KV<Long, Long>>, Void> {
+ private void runApproximateUniqueWithSkewedDistributions(final int elementCount,
+ final int uniqueCount,
+ final int sampleSize) {
+ final List<Integer> elements = Lists.newArrayList();
+ // Zipf distribution with approximately elementCount items.
+ final double s = 1 - 1.0 * uniqueCount / elementCount;
+ final double maxCount = Math.pow(uniqueCount, s);
+ for (int k = 0; k < uniqueCount; k++) {
+ final int count = Math.max(1, (int) Math.round(maxCount * Math.pow(k, -s)));
+ // Element k occurs count times.
+ for (int c = 0; c < count; c++) {
+ elements.add(k);
+ }
+ }
- private int sampleSize;
+ final PCollection<Integer> input = p.apply(Create.of(elements));
+ final PCollection<Long> estimate =
+ input.apply(ApproximateUnique.<Integer>globally(sampleSize));
- public VerifyEstimatePerKeyFn(int sampleSize) {
- this.sampleSize = sampleSize;
+ PAssert.thatSingleton(estimate).satisfies(new VerifyEstimateFn(uniqueCount, sampleSize));
+
+ p.run();
}
- @Override
- public Void apply(Iterable<KV<Long, Long>> estimatePerKey) {
- for (KV<Long, Long> result : estimatePerKey) {
- verifyEstimate(result.getKey(), sampleSize, result.getValue());
+ @Test
+ @Category(NeedsRunner.class)
+ public void testApproximateUniquePerKey() {
+ final List<KV<Long, Long>> elements = Lists.newArrayList();
+ final List<Long> keys = ImmutableList.of(20L, 50L, 100L);
+ final int elementCount = 1000;
+ final int sampleSize = 100;
+ // Use the key as the number of unique values.
+ for (final long uniqueCount : keys) {
+ for (long value = 0; value < elementCount; value++) {
+ elements.add(KV.of(uniqueCount, value % uniqueCount));
+ }
}
- return null;
+
+ final PCollection<KV<Long, Long>> input = p.apply(Create.of(elements));
+ final PCollection<KV<Long, Long>> counts =
+ input.apply(ApproximateUnique.<Long, Long>perKey(sampleSize));
+
+ PAssert.that(counts).satisfies(new VerifyEstimatePerKeyFn(sampleSize));
+
+ p.run();
+
+ }
+
+ @Test
+ public void testApproximateUniqueGetName() {
+ assertEquals("ApproximateUnique.PerKey", ApproximateUnique.<Long, Long>perKey(16).getName());
+ assertEquals("ApproximateUnique.Globally", ApproximateUnique.<Integer>globally(16).getName());
}
- }
- @Test
- public void testDisplayData() {
- ApproximateUnique.Globally<Integer> specifiedSampleSize = ApproximateUnique.globally(1234);
- ApproximateUnique.PerKey<String, Integer> specifiedMaxError = ApproximateUnique.perKey(0.1234);
+ @Test
+ public void testDisplayData() {
+ final ApproximateUnique.Globally<Integer> specifiedSampleSize =
+ ApproximateUnique.globally(1234);
+ final ApproximateUnique.PerKey<String, Integer> specifiedMaxError =
+ ApproximateUnique.perKey(0.1234);
- assertThat(DisplayData.from(specifiedSampleSize), hasDisplayItem("sampleSize", 1234));
+ assertThat(DisplayData.from(specifiedSampleSize), hasDisplayItem("sampleSize", 1234));
- DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError);
- assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234));
- assertThat("calculated sampleSize should be included", maxErrorDisplayData,
- hasDisplayItem("sampleSize"));
+ final DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError);
+ assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234));
+ assertThat("calculated sampleSize should be included", maxErrorDisplayData,
+ hasDisplayItem("sampleSize"));
+ }
}
+
}