You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/28 19:41:35 UTC

[2/8] beam git commit: [BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline as a JUnit rule.

[BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9faa5aba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9faa5aba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9faa5aba

Branch: refs/heads/master
Commit: 9faa5abae4a58eb52efe37f3c29d691f016c2595
Parents: 3aaa1e3
Author: Stas Levin <st...@gmail.com>
Authored: Wed Dec 21 23:21:11 2016 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 11:40:32 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/SampleTest.java  | 388 ++++++++++---------
 1 file changed, 211 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9faa5aba/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 9cc12d4..4e3b31c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -19,24 +19,22 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.sdk.TestUtils.LINES;
-import static org.apache.beam.sdk.TestUtils.NO_LINES;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -47,228 +45,264 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Suite;
 
 /**
  * Tests for Sample transform.
  */
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    SampleTest.PickAnyTest.class,
+    SampleTest.MiscTest.class
+})
 public class SampleTest {
-  static final Integer[] EMPTY = new Integer[] { };
-  static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5};
-  static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
+  private static final Integer[] EMPTY = new Integer[] { };
+  private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5};
+  private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
 
   /**
-   * Verifies that the result of a Sample operation contains the expected number of elements,
-   * and that those elements are a subset of the items in expected.
+   * Test variations for Sample transform.
    */
-  @SuppressWarnings("rawtypes")
-  public static class VerifyCorrectSample<T extends Comparable>
-      implements SerializableFunction<Iterable<T>, Void> {
-    private T[] expectedValues;
-    private int expectedSize;
-
-    /**
-     * expectedSize is the number of elements that the Sample should contain. expected is the set
-     * of elements that the sample may contain.
-     */
-    @SafeVarargs
-    VerifyCorrectSample(int expectedSize, T... expected) {
-      this.expectedValues = expected;
-      this.expectedSize = expectedSize;
+  @RunWith(Parameterized.class)
+  public static class PickAnyTest {
+    @Rule
+    public final transient TestPipeline p = TestPipeline.create();
+
+    @Parameterized.Parameters(name = "limit_{0}")
+    public static Iterable<Object[]> data() throws IOException {
+      return ImmutableList.<Object[]>builder()
+          .add(
+              new Object[] {
+                  0
+              },
+              new Object[] {
+                  1
+              },
+              new Object[] {
+                  LINES.size() / 2
+              },
+              new Object[] {
+                  LINES.size() * 2
+              },
+              new Object[] {
+                  LINES.size() - 1
+              },
+              new Object[] {
+                  LINES.size()
+              },
+              new Object[] {
+                  LINES.size() + 1
+              }
+              )
+          .build();
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public Void apply(Iterable<T> in) {
-      List<T> actual = new ArrayList<>();
-      for (T elem : in) {
-        actual.add(elem);
+    @Parameterized.Parameter
+    public int limit;
+
+    private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> {
+      private final List<String> lines;
+      private final int limit;
+      private VerifyAnySample(List<String> lines, int limit) {
+        this.lines = lines;
+        this.limit = limit;
       }
 
-      assertEquals(expectedSize, actual.size());
+      @Override
+      public Void apply(Iterable<String> actualIter) {
+        final int expectedSize = Math.min(limit, lines.size());
 
-      Collections.sort(actual);  // We assume that @expected is already sorted.
-      int i = 0;  // Index into @expected
-      for (T s : actual) {
-        boolean matchFound = false;
-        for (; i < expectedValues.length; i++) {
-          if (s.equals(expectedValues[i])) {
-            matchFound = true;
-            break;
-          }
+        // Make sure actual is the right length, and is a
+        // subset of expected.
+        List<String> actual = new ArrayList<>();
+        for (String s : actualIter) {
+          actual.add(s);
         }
-        assertTrue("Invalid sample: " +  Joiner.on(',').join(actual), matchFound);
-        i++;  // Don't match the same element again.
+        assertEquals(expectedSize, actual.size());
+        Set<String> actualAsSet = new TreeSet<>(actual);
+        Set<String> linesAsSet = new TreeSet<>(lines);
+        assertEquals(actual.size(), actualAsSet.size());
+        assertEquals(lines.size(), linesAsSet.size());
+        assertTrue(linesAsSet.containsAll(actualAsSet));
+        return null;
       }
-      return null;
     }
-  }
 
-  @Rule
-  public final transient TestPipeline pipeline = TestPipeline.create();
+    void runPickAnyTest(final List<String> lines, int limit) {
+      checkArgument(new HashSet<String>(lines).size() == lines.size(),
+                    "Duplicates are unsupported.");
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSample() {
+      PCollection<String> input = p.apply(Create.of(lines)
+                                                .withCoder(StringUtf8Coder.of()));
 
-    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
-        .withCoder(BigEndianIntegerCoder.of()));
-    PCollection<Iterable<Integer>> output = input.apply(
-        Sample.<Integer>fixedSizeGlobally(3));
+      PCollection<String> output =
+          input.apply(Sample.<String>any(limit));
 
-    PAssert.thatSingletonIterable(output)
-        .satisfies(new VerifyCorrectSample<>(3, DATA));
-    pipeline.run();
-  }
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSampleEmpty() {
+      PAssert.that(output)
+             .satisfies(new VerifyAnySample(lines, limit));
 
-    PCollection<Integer> input = pipeline.apply(Create.of(EMPTY)
-        .withCoder(BigEndianIntegerCoder.of()));
-    PCollection<Iterable<Integer>> output = input.apply(
-        Sample.<Integer>fixedSizeGlobally(3));
+      p.run();
+    }
 
-    PAssert.thatSingletonIterable(output)
-        .satisfies(new VerifyCorrectSample<>(0, EMPTY));
-    pipeline.run();
+    @Test
+    @Category(RunnableOnService.class)
+    public void testPickAny() {
+      runPickAnyTest(LINES, limit);
+    }
   }
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSampleZero() {
+  /**
+   * Further tests for Sample transform.
+   */
+  @RunWith(JUnit4.class)
+  public static class MiscTest {
 
-    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
-        .withCoder(BigEndianIntegerCoder.of()));
-    PCollection<Iterable<Integer>> output = input.apply(
-        Sample.<Integer>fixedSizeGlobally(0));
+    @Rule
+    public final transient TestPipeline pipeline = TestPipeline.create();
 
-    PAssert.thatSingletonIterable(output)
-        .satisfies(new VerifyCorrectSample<>(0, DATA));
-    pipeline.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSampleInsufficientElements() {
+    /**
+     * Verifies that the result of a Sample operation contains the expected number of elements,
+     * and that those elements are a subset of the items in expected.
+     */
+    @SuppressWarnings("rawtypes")
+    public static class VerifyCorrectSample<T extends Comparable>
+        implements SerializableFunction<Iterable<T>, Void> {
+      private T[] expectedValues;
+      private int expectedSize;
+
+      /**
+       * expectedSize is the number of elements that the Sample should contain. expected is the set
+       * of elements that the sample may contain.
+       */
+      @SafeVarargs
+      VerifyCorrectSample(int expectedSize, T... expected) {
+        this.expectedValues = expected;
+        this.expectedSize = expectedSize;
+      }
 
-    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
-        .withCoder(BigEndianIntegerCoder.of()));
-    PCollection<Iterable<Integer>> output = input.apply(
-        Sample.<Integer>fixedSizeGlobally(10));
+      @Override
+      @SuppressWarnings("unchecked")
+      public Void apply(Iterable<T> in) {
+        List<T> actual = new ArrayList<>();
+        for (T elem : in) {
+          actual.add(elem);
+        }
 
-    PAssert.thatSingletonIterable(output)
-        .satisfies(new VerifyCorrectSample<>(5, DATA));
-    pipeline.run();
-  }
+        assertEquals(expectedSize, actual.size());
+
+        Collections.sort(actual);  // We assume that @expected is already sorted.
+        int i = 0;  // Index into @expected
+        for (T s : actual) {
+          boolean matchFound = false;
+          for (; i < expectedValues.length; i++) {
+            if (s.equals(expectedValues[i])) {
+              matchFound = true;
+              break;
+            }
+          }
+          assertTrue("Invalid sample: " +  Joiner.on(',').join(actual), matchFound);
+          i++;  // Don't match the same element again.
+        }
+        return null;
+      }
+    }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testSampleNegative() {
-    pipeline.enableAbandonedNodeEnforcement(false);
+    @Test
+    @Category(RunnableOnService.class)
+    public void testSample() {
 
-    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
-        .withCoder(BigEndianIntegerCoder.of()));
-    input.apply(Sample.<Integer>fixedSizeGlobally(-1));
-  }
+      PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Iterable<Integer>> output = input.apply(
+          Sample.<Integer>fixedSizeGlobally(3));
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSampleMultiplicity() {
+      PAssert.thatSingletonIterable(output)
+             .satisfies(new VerifyCorrectSample<>(3, DATA));
+      pipeline.run();
+    }
 
-    PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA)
-        .withCoder(BigEndianIntegerCoder.of()));
-    // At least one value must be selected with multiplicity.
-    PCollection<Iterable<Integer>> output = input.apply(
-        Sample.<Integer>fixedSizeGlobally(6));
+    @Test
+    @Category(RunnableOnService.class)
+    public void testSampleEmpty() {
 
-    PAssert.thatSingletonIterable(output)
-        .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA));
-    pipeline.run();
-  }
+      PCollection<Integer> input = pipeline.apply(Create.of(EMPTY)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Iterable<Integer>> output = input.apply(
+          Sample.<Integer>fixedSizeGlobally(3));
 
-  private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> {
-    private final List<String> lines;
-    private final int limit;
-    private VerifyAnySample(List<String> lines, int limit) {
-      this.lines = lines;
-      this.limit = limit;
+      PAssert.thatSingletonIterable(output)
+             .satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      pipeline.run();
     }
 
-    @Override
-    public Void apply(Iterable<String> actualIter) {
-      final int expectedSize = Math.min(limit, lines.size());
+    @Test
+    @Category(RunnableOnService.class)
+    public void testSampleZero() {
 
-      // Make sure actual is the right length, and is a
-      // subset of expected.
-      List<String> actual = new ArrayList<>();
-      for (String s : actualIter) {
-        actual.add(s);
-      }
-      assertEquals(expectedSize, actual.size());
-      Set<String> actualAsSet = new TreeSet<>(actual);
-      Set<String> linesAsSet = new TreeSet<>(lines);
-      assertEquals(actual.size(), actualAsSet.size());
-      assertEquals(lines.size(), linesAsSet.size());
-      assertTrue(linesAsSet.containsAll(actualAsSet));
-      return null;
-    }
-  }
+      PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Iterable<Integer>> output = input.apply(
+          Sample.<Integer>fixedSizeGlobally(0));
 
-  void runPickAnyTest(final List<String> lines, int limit) {
-    checkArgument(new HashSet<String>(lines).size() == lines.size(), "Duplicates are unsupported.");
-    Pipeline p = TestPipeline.create();
+      PAssert.thatSingletonIterable(output)
+             .satisfies(new VerifyCorrectSample<>(0, DATA));
+      pipeline.run();
+    }
 
-    PCollection<String> input = p.apply(Create.of(lines)
-        .withCoder(StringUtf8Coder.of()));
+    @Test
+    @Category(RunnableOnService.class)
+    public void testSampleInsufficientElements() {
 
-    PCollection<String> output =
-        input.apply(Sample.<String>any(limit));
+      PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Iterable<Integer>> output = input.apply(
+          Sample.<Integer>fixedSizeGlobally(10));
 
+      PAssert.thatSingletonIterable(output)
+             .satisfies(new VerifyCorrectSample<>(5, DATA));
+      pipeline.run();
+    }
 
-    PAssert.that(output)
-        .satisfies(new VerifyAnySample(lines, limit));
+    @Test(expected = IllegalArgumentException.class)
+    public void testSampleNegative() {
+      pipeline.enableAbandonedNodeEnforcement(false);
 
-    p.run();
-  }
+      PCollection<Integer> input = pipeline.apply(Create.of(DATA)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      input.apply(Sample.<Integer>fixedSizeGlobally(-1));
+    }
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testPickAny() {
-    runPickAnyTest(LINES, 0);
-    runPickAnyTest(LINES, LINES.size() / 2);
-    runPickAnyTest(LINES, LINES.size() * 2);
-  }
+    @Test
+    @Category(RunnableOnService.class)
+    public void testSampleMultiplicity() {
 
-  @Test
-  // Extra tests, not worth the time to run on the real service.
-  @Category(NeedsRunner.class)
-  public void testPickAnyMore() {
-    runPickAnyTest(LINES, LINES.size() - 1);
-    runPickAnyTest(LINES, LINES.size());
-    runPickAnyTest(LINES, LINES.size() + 1);
-  }
+      PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA)
+                                                        .withCoder(BigEndianIntegerCoder.of()));
+      // At least one value must be selected with multiplicity.
+      PCollection<Iterable<Integer>> output = input.apply(
+          Sample.<Integer>fixedSizeGlobally(6));
 
-  @Test
-  @Category(RunnableOnService.class)
-  public void testPickAnyWhenEmpty() {
-    runPickAnyTest(NO_LINES, 0);
-    runPickAnyTest(NO_LINES, 1);
-  }
+      PAssert.thatSingletonIterable(output)
+             .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA));
+      pipeline.run();
+    }
 
-  @Test
-  public void testSampleGetName() {
-    assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
-  }
+    @Test
+    public void testSampleGetName() {
+      assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+    }
 
-  @Test
-  public void testDisplayData() {
-    PTransform<?, ?> sampleAny = Sample.any(1234);
-    DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
-    assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
+    @Test
+    public void testDisplayData() {
+      PTransform<?, ?> sampleAny = Sample.any(1234);
+      DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
+      assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
 
-    PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
-    DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
-    assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+      PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
+      DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
+      assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+    }
   }
 }