You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/28 19:41:35 UTC
[2/8] beam git commit: [BEAM-1186] Broke SampleTest into 2 test
classes that support TestPipeline as a JUnit rule.
[BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9faa5aba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9faa5aba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9faa5aba
Branch: refs/heads/master
Commit: 9faa5abae4a58eb52efe37f3c29d691f016c2595
Parents: 3aaa1e3
Author: Stas Levin <st...@gmail.com>
Authored: Wed Dec 21 23:21:11 2016 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 11:40:32 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/SampleTest.java | 388 ++++++++++---------
1 file changed, 211 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9faa5aba/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 9cc12d4..4e3b31c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -19,24 +19,22 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.sdk.TestUtils.LINES;
-import static org.apache.beam.sdk.TestUtils.NO_LINES;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -47,228 +45,264 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Suite;
/**
* Tests for Sample transform.
*/
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ SampleTest.PickAnyTest.class,
+ SampleTest.MiscTest.class
+})
public class SampleTest {
- static final Integer[] EMPTY = new Integer[] { };
- static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5};
- static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
+ private static final Integer[] EMPTY = new Integer[] { };
+ private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5};
+ private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
/**
- * Verifies that the result of a Sample operation contains the expected number of elements,
- * and that those elements are a subset of the items in expected.
+ * Test variations for Sample transform.
*/
- @SuppressWarnings("rawtypes")
- public static class VerifyCorrectSample<T extends Comparable>
- implements SerializableFunction<Iterable<T>, Void> {
- private T[] expectedValues;
- private int expectedSize;
-
- /**
- * expectedSize is the number of elements that the Sample should contain. expected is the set
- * of elements that the sample may contain.
- */
- @SafeVarargs
- VerifyCorrectSample(int expectedSize, T... expected) {
- this.expectedValues = expected;
- this.expectedSize = expectedSize;
+ @RunWith(Parameterized.class)
+ public static class PickAnyTest {
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Parameterized.Parameters(name = "limit_{0}")
+ public static Iterable<Object[]> data() throws IOException {
+ return ImmutableList.<Object[]>builder()
+ .add(
+ new Object[] {
+ 0
+ },
+ new Object[] {
+ 1
+ },
+ new Object[] {
+ LINES.size() / 2
+ },
+ new Object[] {
+ LINES.size() * 2
+ },
+ new Object[] {
+ LINES.size() - 1
+ },
+ new Object[] {
+ LINES.size()
+ },
+ new Object[] {
+ LINES.size() + 1
+ }
+ )
+ .build();
}
- @Override
- @SuppressWarnings("unchecked")
- public Void apply(Iterable<T> in) {
- List<T> actual = new ArrayList<>();
- for (T elem : in) {
- actual.add(elem);
+ @Parameterized.Parameter
+ public int limit;
+
+ private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> {
+ private final List<String> lines;
+ private final int limit;
+ private VerifyAnySample(List<String> lines, int limit) {
+ this.lines = lines;
+ this.limit = limit;
}
- assertEquals(expectedSize, actual.size());
+ @Override
+ public Void apply(Iterable<String> actualIter) {
+ final int expectedSize = Math.min(limit, lines.size());
- Collections.sort(actual); // We assume that @expected is already sorted.
- int i = 0; // Index into @expected
- for (T s : actual) {
- boolean matchFound = false;
- for (; i < expectedValues.length; i++) {
- if (s.equals(expectedValues[i])) {
- matchFound = true;
- break;
- }
+ // Make sure actual is the right length, and is a
+ // subset of expected.
+ List<String> actual = new ArrayList<>();
+ for (String s : actualIter) {
+ actual.add(s);
}
- assertTrue("Invalid sample: " + Joiner.on(',').join(actual), matchFound);
- i++; // Don't match the same element again.
+ assertEquals(expectedSize, actual.size());
+ Set<String> actualAsSet = new TreeSet<>(actual);
+ Set<String> linesAsSet = new TreeSet<>(lines);
+ assertEquals(actual.size(), actualAsSet.size());
+ assertEquals(lines.size(), linesAsSet.size());
+ assertTrue(linesAsSet.containsAll(actualAsSet));
+ return null;
}
- return null;
}
- }
- @Rule
- public final transient TestPipeline pipeline = TestPipeline.create();
+ void runPickAnyTest(final List<String> lines, int limit) {
+ checkArgument(new HashSet<String>(lines).size() == lines.size(),
+ "Duplicates are unsupported.");
- @Test
- @Category(RunnableOnService.class)
- public void testSample() {
+ PCollection<String> input = p.apply(Create.of(lines)
+ .withCoder(StringUtf8Coder.of()));
- PCollection<Integer> input = pipeline.apply(Create.of(DATA)
- .withCoder(BigEndianIntegerCoder.of()));
- PCollection<Iterable<Integer>> output = input.apply(
- Sample.<Integer>fixedSizeGlobally(3));
+ PCollection<String> output =
+ input.apply(Sample.<String>any(limit));
- PAssert.thatSingletonIterable(output)
- .satisfies(new VerifyCorrectSample<>(3, DATA));
- pipeline.run();
- }
- @Test
- @Category(RunnableOnService.class)
- public void testSampleEmpty() {
+ PAssert.that(output)
+ .satisfies(new VerifyAnySample(lines, limit));
- PCollection<Integer> input = pipeline.apply(Create.of(EMPTY)
- .withCoder(BigEndianIntegerCoder.of()));
- PCollection<Iterable<Integer>> output = input.apply(
- Sample.<Integer>fixedSizeGlobally(3));
+ p.run();
+ }
- PAssert.thatSingletonIterable(output)
- .satisfies(new VerifyCorrectSample<>(0, EMPTY));
- pipeline.run();
+ @Test
+ @Category(RunnableOnService.class)
+ public void testPickAny() {
+ runPickAnyTest(LINES, limit);
+ }
}
- @Test
- @Category(RunnableOnService.class)
- public void testSampleZero() {
+ /**
+ * Further tests for Sample transform.
+ */
+ @RunWith(JUnit4.class)
+ public static class MiscTest {
- PCollection<Integer> input = pipeline.apply(Create.of(DATA)
- .withCoder(BigEndianIntegerCoder.of()));
- PCollection<Iterable<Integer>> output = input.apply(
- Sample.<Integer>fixedSizeGlobally(0));
+ @Rule
+ public final transient TestPipeline pipeline = TestPipeline.create();
- PAssert.thatSingletonIterable(output)
- .satisfies(new VerifyCorrectSample<>(0, DATA));
- pipeline.run();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testSampleInsufficientElements() {
+ /**
+ * Verifies that the result of a Sample operation contains the expected number of elements,
+ * and that those elements are a subset of the items in expected.
+ */
+ @SuppressWarnings("rawtypes")
+ public static class VerifyCorrectSample<T extends Comparable>
+ implements SerializableFunction<Iterable<T>, Void> {
+ private T[] expectedValues;
+ private int expectedSize;
+
+ /**
+ * expectedSize is the number of elements that the Sample should contain. expected is the set
+ * of elements that the sample may contain.
+ */
+ @SafeVarargs
+ VerifyCorrectSample(int expectedSize, T... expected) {
+ this.expectedValues = expected;
+ this.expectedSize = expectedSize;
+ }
- PCollection<Integer> input = pipeline.apply(Create.of(DATA)
- .withCoder(BigEndianIntegerCoder.of()));
- PCollection<Iterable<Integer>> output = input.apply(
- Sample.<Integer>fixedSizeGlobally(10));
+ @Override
+ @SuppressWarnings("unchecked")
+ public Void apply(Iterable<T> in) {
+ List<T> actual = new ArrayList<>();
+ for (T elem : in) {
+ actual.add(elem);
+ }
- PAssert.thatSingletonIterable(output)
- .satisfies(new VerifyCorrectSample<>(5, DATA));
- pipeline.run();
- }
+ assertEquals(expectedSize, actual.size());
+
+ Collections.sort(actual); // We assume that @expected is already sorted.
+ int i = 0; // Index into @expected
+ for (T s : actual) {
+ boolean matchFound = false;
+ for (; i < expectedValues.length; i++) {
+ if (s.equals(expectedValues[i])) {
+ matchFound = true;
+ break;
+ }
+ }
+ assertTrue("Invalid sample: " + Joiner.on(',').join(actual), matchFound);
+ i++; // Don't match the same element again.
+ }
+ return null;
+ }
+ }
- @Test(expected = IllegalArgumentException.class)
- public void testSampleNegative() {
- pipeline.enableAbandonedNodeEnforcement(false);
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSample() {
- PCollection<Integer> input = pipeline.apply(Create.of(DATA)
- .withCoder(BigEndianIntegerCoder.of()));
- input.apply(Sample.<Integer>fixedSizeGlobally(-1));
- }
+ PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+ .withCoder(BigEndianIntegerCoder.of()));
+ PCollection<Iterable<Integer>> output = input.apply(
+ Sample.<Integer>fixedSizeGlobally(3));
- @Test
- @Category(RunnableOnService.class)
- public void testSampleMultiplicity() {
+ PAssert.thatSingletonIterable(output)
+ .satisfies(new VerifyCorrectSample<>(3, DATA));
+ pipeline.run();
+ }
- PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA)
- .withCoder(BigEndianIntegerCoder.of()));
- // At least one value must be selected with multiplicity.
- PCollection<Iterable<Integer>> output = input.apply(
- Sample.<Integer>fixedSizeGlobally(6));
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSampleEmpty() {
- PAssert.thatSingletonIterable(output)
- .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA));
- pipeline.run();
- }
+ PCollection<Integer> input = pipeline.apply(Create.of(EMPTY)
+ .withCoder(BigEndianIntegerCoder.of()));
+ PCollection<Iterable<Integer>> output = input.apply(
+ Sample.<Integer>fixedSizeGlobally(3));
- private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> {
- private final List<String> lines;
- private final int limit;
- private VerifyAnySample(List<String> lines, int limit) {
- this.lines = lines;
- this.limit = limit;
+ PAssert.thatSingletonIterable(output)
+ .satisfies(new VerifyCorrectSample<>(0, EMPTY));
+ pipeline.run();
}
- @Override
- public Void apply(Iterable<String> actualIter) {
- final int expectedSize = Math.min(limit, lines.size());
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSampleZero() {
- // Make sure actual is the right length, and is a
- // subset of expected.
- List<String> actual = new ArrayList<>();
- for (String s : actualIter) {
- actual.add(s);
- }
- assertEquals(expectedSize, actual.size());
- Set<String> actualAsSet = new TreeSet<>(actual);
- Set<String> linesAsSet = new TreeSet<>(lines);
- assertEquals(actual.size(), actualAsSet.size());
- assertEquals(lines.size(), linesAsSet.size());
- assertTrue(linesAsSet.containsAll(actualAsSet));
- return null;
- }
- }
+ PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+ .withCoder(BigEndianIntegerCoder.of()));
+ PCollection<Iterable<Integer>> output = input.apply(
+ Sample.<Integer>fixedSizeGlobally(0));
- void runPickAnyTest(final List<String> lines, int limit) {
- checkArgument(new HashSet<String>(lines).size() == lines.size(), "Duplicates are unsupported.");
- Pipeline p = TestPipeline.create();
+ PAssert.thatSingletonIterable(output)
+ .satisfies(new VerifyCorrectSample<>(0, DATA));
+ pipeline.run();
+ }
- PCollection<String> input = p.apply(Create.of(lines)
- .withCoder(StringUtf8Coder.of()));
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSampleInsufficientElements() {
- PCollection<String> output =
- input.apply(Sample.<String>any(limit));
+ PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+ .withCoder(BigEndianIntegerCoder.of()));
+ PCollection<Iterable<Integer>> output = input.apply(
+ Sample.<Integer>fixedSizeGlobally(10));
+ PAssert.thatSingletonIterable(output)
+ .satisfies(new VerifyCorrectSample<>(5, DATA));
+ pipeline.run();
+ }
- PAssert.that(output)
- .satisfies(new VerifyAnySample(lines, limit));
+ @Test(expected = IllegalArgumentException.class)
+ public void testSampleNegative() {
+ pipeline.enableAbandonedNodeEnforcement(false);
- p.run();
- }
+ PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+ .withCoder(BigEndianIntegerCoder.of()));
+ input.apply(Sample.<Integer>fixedSizeGlobally(-1));
+ }
- @Test
- @Category(RunnableOnService.class)
- public void testPickAny() {
- runPickAnyTest(LINES, 0);
- runPickAnyTest(LINES, LINES.size() / 2);
- runPickAnyTest(LINES, LINES.size() * 2);
- }
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSampleMultiplicity() {
- @Test
- // Extra tests, not worth the time to run on the real service.
- @Category(NeedsRunner.class)
- public void testPickAnyMore() {
- runPickAnyTest(LINES, LINES.size() - 1);
- runPickAnyTest(LINES, LINES.size());
- runPickAnyTest(LINES, LINES.size() + 1);
- }
+ PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA)
+ .withCoder(BigEndianIntegerCoder.of()));
+ // At least one value must be selected with multiplicity.
+ PCollection<Iterable<Integer>> output = input.apply(
+ Sample.<Integer>fixedSizeGlobally(6));
- @Test
- @Category(RunnableOnService.class)
- public void testPickAnyWhenEmpty() {
- runPickAnyTest(NO_LINES, 0);
- runPickAnyTest(NO_LINES, 1);
- }
+ PAssert.thatSingletonIterable(output)
+ .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA));
+ pipeline.run();
+ }
- @Test
- public void testSampleGetName() {
- assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
- }
+ @Test
+ public void testSampleGetName() {
+ assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+ }
- @Test
- public void testDisplayData() {
- PTransform<?, ?> sampleAny = Sample.any(1234);
- DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
- assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
+ @Test
+ public void testDisplayData() {
+ PTransform<?, ?> sampleAny = Sample.any(1234);
+ DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
+ assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
- PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
- DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
- assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+ PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
+ DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
+ assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+ }
}
}