You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:51 UTC

[27/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
deleted file mode 100644
index b8f9b0b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Helper functions and test harnesses for checking correctness of {@link Source}
- * implementations.
- *
- * <p>Contains a few lightweight utilities (e.g. reading items from a source or a reader,
- * such as {@link #readFromSource} and {@link #readFromUnstartedReader}), as well as
- * heavyweight property testing and stress testing harnesses that help getting a large
- * amount of test coverage with few code. Most notable ones are:
- * <ul>
- *   <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read
- *   by the union of sources produced by {@link BoundedSource#splitIntoBundles}
- *   is the same as data read by the original source.
- *   <li>If your source implements dynamic work rebalancing, use the
- *   {@code assertSplitAtFraction} family of functions - they test behavior of
- *   {@link BoundedSource.BoundedReader#splitAtFraction}, in particular, that
- *   various consistency properties are respected and the total set of data read
- *   by the source is preserved when splits happen.
- *   Use {@link #assertSplitAtFractionBehavior} to test individual cases
- *   of {@code splitAtFraction} and use {@link #assertSplitAtFractionExhaustive}
- *   as a heavy-weight stress test including concurrency. We strongly recommend to
- *   use both.
- * </ul>
- * For example usages, see the unit tests of classes such as
- * {@link com.google.cloud.dataflow.sdk.io.AvroSource} or
- * {@link com.google.cloud.dataflow.sdk.io.XmlSource}.
- *
- * <p>Like {@link DataflowAssert}, requires JUnit and Hamcrest to be present in the classpath.
- */
-public class SourceTestUtils {
-  // A wrapper around a value of type T that compares according to the structural
-  // value provided by a Coder<T>, but prints both the original and structural value,
-  // to help get good error messages from JUnit equality assertion failures and such.
-  private static class ReadableStructuralValue<T> {
-    private T originalValue;
-    private Object structuralValue;
-
-    public ReadableStructuralValue(T originalValue, Object structuralValue) {
-      this.originalValue = originalValue;
-      this.structuralValue = structuralValue;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(structuralValue);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == null || !(obj instanceof ReadableStructuralValue)) {
-        return false;
-      }
-      return Objects.equals(structuralValue, ((ReadableStructuralValue) obj).structuralValue);
-    }
-
-    @Override
-    public String toString() {
-      return String.format("[%s (structural %s)]", originalValue, structuralValue);
-    }
-  }
-
-  /**
-   * Testing utilities below depend on standard assertions and matchers to compare elements read by
-   * sources. In general the elements may not implement {@code equals}/{@code hashCode} properly,
-   * however every source has a {@link Coder} and every {@code Coder} can
-   * produce a {@link Coder#structuralValue} whose {@code equals}/{@code hashCode} is
-   * consistent with equality of encoded format.
-   * So we use this {@link Coder#structuralValue} to compare elements read by sources.
-   */
-  public static <T> List<ReadableStructuralValue<T>> createStructuralValues(
-      Coder<T> coder, List<T> list)
-      throws Exception {
-    List<ReadableStructuralValue<T>> result = new ArrayList<>();
-    for (T elem : list) {
-      result.add(new ReadableStructuralValue<>(elem, coder.structuralValue(elem)));
-    }
-    return result;
-  }
-
-  /**
-   * Reads all elements from the given {@link BoundedSource}.
-   */
-  public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options)
-      throws IOException {
-    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
-      return readFromUnstartedReader(reader);
-    }
-  }
-
-  /**
-   * Reads all elements from the given unstarted {@link Source.Reader}.
-   */
-  public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {
-    return readRemainingFromReader(reader, false);
-  }
-
-  /**
-   * Reads all elements from the given started {@link Source.Reader}.
-   */
-  public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {
-    return readRemainingFromReader(reader, true);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} until n elements are read.
-   */
-  public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)
-      throws IOException {
-    return readNItemsFromReader(reader, n, false);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} that has already had {@link Source.Reader#start}
-   * called on it, until n elements are read.
-   */
-  public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n)
-      throws IOException {
-    return readNItemsFromReader(reader, n, true);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} until n elements are read.
-   *
-   * <p>There must be at least n elements remaining in the reader, except for
-   * the case when n is {@code Integer.MAX_VALUE}, which means "read all
-   * remaining elements".
-   */
-  private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started)
-      throws IOException {
-    List<T> res = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
-      boolean shouldStart = (i == 0 && !started);
-      boolean more = shouldStart ? reader.start() : reader.advance();
-      if (n != Integer.MAX_VALUE) {
-        assertTrue(more);
-      }
-      if (!more) {
-        break;
-      }
-      res.add(reader.getCurrent());
-    }
-    return res;
-  }
-
-  /**
-   * Read all remaining elements from a {@link Source.Reader}.
-   */
-  public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started)
-      throws IOException {
-    return readNItemsFromReader(reader, Integer.MAX_VALUE, started);
-  }
-
-  /**
-   * Given a reference {@code Source} and a list of {@code Source}s, assert that the union of
-   * the records read from the list of sources is equal to the records read from the reference
-   * source.
-   */
-  public static <T> void assertSourcesEqualReferenceSource(
-      BoundedSource<T> referenceSource,
-      List<? extends BoundedSource<T>> sources,
-      PipelineOptions options)
-      throws Exception {
-    Coder<T> coder = referenceSource.getDefaultOutputCoder();
-    List<T> referenceRecords = readFromSource(referenceSource, options);
-    List<T> bundleRecords = new ArrayList<>();
-    for (BoundedSource<T> source : sources) {
-      assertThat(
-          "Coder type for source "
-              + source
-              + " is not compatible with Coder type for referenceSource "
-              + referenceSource,
-          source.getDefaultOutputCoder(),
-          equalTo(coder));
-      List<T> elems = readFromSource(source, options);
-      bundleRecords.addAll(elems);
-    }
-    List<ReadableStructuralValue<T>> bundleValues =
-        createStructuralValues(coder, bundleRecords);
-    List<ReadableStructuralValue<T>> referenceValues =
-        createStructuralValues(coder, referenceRecords);
-    assertThat(bundleValues, containsInAnyOrder(referenceValues.toArray()));
-  }
-
-  /**
-   * Assert that a {@code Reader} returns a {@code Source} that, when read from, produces the same
-   * records as the reader.
-   */
-  public static <T> void assertUnstartedReaderReadsSameAsItsSource(
-      BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
-    Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
-    List<T> expected = readFromUnstartedReader(reader);
-    List<T> actual = readFromSource(reader.getCurrentSource(), options);
-    List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
-    List<ReadableStructuralValue<T>> actualStructural = createStructuralValues(coder, actual);
-    assertThat(actualStructural, containsInAnyOrder(expectedStructural.toArray()));
-  }
-
-  /**
-   * Expected outcome of
-   * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader#splitAtFraction}.
-   */
-  public enum ExpectedSplitOutcome {
-    /**
-     * The operation must succeed and the results must be consistent.
-     */
-    MUST_SUCCEED_AND_BE_CONSISTENT,
-    /**
-     * The operation must fail (return {@code null}).
-     */
-    MUST_FAIL,
-    /**
-     * The operation must either fail, or succeed and the results be consistent.
-     */
-    MUST_BE_CONSISTENT_IF_SUCCEEDS
-  }
-
-  /**
-   * Contains two values: the number of items in the primary source, and the number of items in
-   * the residual source, -1 if split failed.
-   */
-  private static class SplitAtFractionResult {
-    public int numPrimaryItems;
-    public int numResidualItems;
-
-    public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {
-      this.numPrimaryItems = numPrimaryItems;
-      this.numResidualItems = numResidualItems;
-    }
-  }
-
-  /**
-   * Asserts that the {@code source}'s reader either fails to {@code splitAtFraction(fraction)}
-   * after reading {@code numItemsToReadBeforeSplit} items, or succeeds in a way that is
-   * consistent according to {@link #assertSplitAtFractionSucceedsAndConsistent}.
-   * <p> Returns SplitAtFractionResult.
-   */
-
-  public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      ExpectedSplitOutcome expectedOutcome,
-      PipelineOptions options)
-      throws Exception {
-    return assertSplitAtFractionBehaviorImpl(
-        source, readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction,
-        expectedOutcome, options);
-  }
-
-  /**
-   * Compares two lists elementwise and throws a detailed assertion failure optimized for
-   * human reading in case they are unequal.
-   */
-  private static <T> void assertListsEqualInOrder(
-      String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {
-    int i = 0;
-    for (; i < expected.size() && i < actual.size(); ++i) {
-      if (!Objects.equals(expected.get(i), actual.get(i))) {
-        Assert.fail(String.format(
-            "%s: %s and %s have %d items in common and then differ. "
-            + "Item in %s (%d more): %s, item in %s (%d more): %s",
-            message, expectedLabel, actualLabel, i,
-            expectedLabel, expected.size() - i - 1, expected.get(i),
-            actualLabel, actual.size() - i - 1, actual.get(i)));
-      }
-    }
-    if (i < expected.size() /* but i == actual.size() */) {
-      Assert.fail(String.format(
-          "%s: %s has %d more items after matching all %d from %s. First 5: %s",
-          message, expectedLabel, expected.size() - actual.size(), actual.size(), actualLabel,
-          expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));
-    } else if (i < actual.size() /* but i == expected.size() */) {
-      Assert.fail(String.format(
-          "%s: %s has %d more items after matching all %d from %s. First 5: %s",
-          message, actualLabel, actual.size() - expected.size(), expected.size(), expectedLabel,
-          actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));
-    } else {
-      // All is well.
-    }
-  }
-
-  private static <T> SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl(
-      BoundedSource<T> source, List<T> expectedItems, int numItemsToReadBeforeSplit,
-      double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options)
-      throws Exception {
-    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
-      BoundedSource<T> originalSource = reader.getCurrentSource();
-      List<T> currentItems = readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);
-      BoundedSource<T> residual = reader.splitAtFraction(splitFraction);
-      if (residual != null) {
-        assertFalse(
-            String.format(
-                "Primary source didn't change after a successful split of %s at %f "
-                + "after reading %d items. "
-                + "Was the source object mutated instead of creating a new one? "
-                + "Source objects MUST be immutable.",
-                source, splitFraction, numItemsToReadBeforeSplit),
-            reader.getCurrentSource() == originalSource);
-        assertFalse(
-            String.format(
-                "Residual source equal to original source after a successful split of %s at %f "
-                + "after reading %d items. "
-                + "Was the source object mutated instead of creating a new one? "
-                + "Source objects MUST be immutable.",
-                source, splitFraction, numItemsToReadBeforeSplit),
-            reader.getCurrentSource() == residual);
-      }
-      // Failure cases are: must succeed but fails; must fail but succeeds.
-      switch (expectedOutcome) {
-        case MUST_SUCCEED_AND_BE_CONSISTENT:
-          assertNotNull(
-              "Failed to split reader of source: "
-                  + source
-                  + " at "
-                  + splitFraction
-                  + " after reading "
-                  + numItemsToReadBeforeSplit
-                  + " items",
-              residual);
-          break;
-        case MUST_FAIL:
-          assertEquals(null, residual);
-          break;
-        case MUST_BE_CONSISTENT_IF_SUCCEEDS:
-          // Nothing.
-          break;
-      }
-      currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));
-      BoundedSource<T> primary = reader.getCurrentSource();
-      return verifySingleSplitAtFractionResult(
-          source, expectedItems, currentItems, primary, residual,
-          numItemsToReadBeforeSplit, splitFraction, options);
-    }
-  }
-
-  private static <T> SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult(
-      BoundedSource<T> source, List<T> expectedItems, List<T> currentItems,
-      BoundedSource<T> primary, BoundedSource<T> residual,
-      int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options)
-      throws Exception {
-    List<T> primaryItems = readFromSource(primary, options);
-    if (residual != null) {
-      List<T> residualItems = readFromSource(residual, options);
-      List<T> totalItems = new ArrayList<>();
-      totalItems.addAll(primaryItems);
-      totalItems.addAll(residualItems);
-      String errorMsgForPrimarySourceComp =
-          String.format(
-              "Continued reading after split yielded different items than primary source: "
-                  + "split at %s after reading %s items, original source: %s, primary source: %s",
-              splitFraction,
-              numItemsToReadBeforeSplit,
-              source,
-              primary);
-      String errorMsgForTotalSourceComp =
-          String.format(
-              "Items in primary and residual sources after split do not add up to items "
-                  + "in the original source. Split at %s after reading %s items; "
-                  + "original source: %s, primary: %s, residual: %s",
-              splitFraction,
-              numItemsToReadBeforeSplit,
-              source,
-              primary,
-              residual);
-      Coder<T> coder = primary.getDefaultOutputCoder();
-      List<ReadableStructuralValue<T>> primaryValues =
-          createStructuralValues(coder, primaryItems);
-      List<ReadableStructuralValue<T>> currentValues =
-          createStructuralValues(coder, currentItems);
-      List<ReadableStructuralValue<T>> expectedValues =
-          createStructuralValues(coder, expectedItems);
-      List<ReadableStructuralValue<T>> totalValues =
-          createStructuralValues(coder, totalItems);
-      assertListsEqualInOrder(
-          errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);
-      assertListsEqualInOrder(
-          errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);
-      return new SplitAtFractionResult(primaryItems.size(), residualItems.size());
-    }
-    return new SplitAtFractionResult(primaryItems.size(), -1);
-  }
-
-  /**
-   * Verifies some consistency properties of
-   * {@link BoundedSource.BoundedReader#splitAtFraction} on the given source. Equivalent to
-   * the following pseudocode:
-   * <pre>
-   *   Reader reader = source.createReader();
-   *   read N items from reader;
-   *   Source residual = reader.splitAtFraction(splitFraction);
-   *   Source primary = reader.getCurrentSource();
-   *   assert: items in primary == items we read so far
-   *                               + items we'll get by continuing to read from reader;
-   *   assert: items in original source == items in primary + items in residual
-   * </pre>
-   */
-  public static <T> void assertSplitAtFractionSucceedsAndConsistent(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      PipelineOptions options)
-      throws Exception {
-    assertSplitAtFractionBehavior(
-        source,
-        numItemsToReadBeforeSplit,
-        splitFraction,
-        ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT,
-        options);
-  }
-
-  /**
-   * Asserts that the {@code source}'s reader fails to {@code splitAtFraction(fraction)}
-   * after reading {@code numItemsToReadBeforeSplit} items.
-   */
-  public static <T> void assertSplitAtFractionFails(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      PipelineOptions options)
-      throws Exception {
-    assertSplitAtFractionBehavior(
-        source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);
-  }
-
-  private static class SplitFractionStatistics {
-    List<Double> successfulFractions = new ArrayList<>();
-    List<Double> nonTrivialFractions = new ArrayList<>();
-  }
-
-  /**
-   * Asserts that given a start position,
-   * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
-   * between two fractions that differ by at least one item) can be called successfully and the
-   * results are consistent if a split succeeds.
-   */
-  private static <T> void assertSplitAtFractionBinary(
-      BoundedSource<T> source,
-      List<T> expectedItems,
-      int numItemsToBeReadBeforeSplit,
-      double leftFraction,
-      SplitAtFractionResult leftResult,
-      double rightFraction,
-      SplitAtFractionResult rightResult,
-      PipelineOptions options,
-      SplitFractionStatistics stats)
-      throws Exception {
-    if (rightFraction - leftFraction < 0.001) {
-      // Do not recurse too deeply. Otherwise we will end up in infinite
-      // recursion, e.g., while trying to find the exact minimal fraction s.t.
-      // split succeeds. A precision of 0.001 when looking for such a fraction
-      // ought to be enough for everybody.
-      return;
-    }
-    double middleFraction = (rightFraction + leftFraction) / 2;
-    if (leftResult == null) {
-      leftResult = assertSplitAtFractionBehaviorImpl(
-          source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction,
-          ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    }
-    if (rightResult == null) {
-      rightResult = assertSplitAtFractionBehaviorImpl(
-          source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction,
-          ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    }
-    SplitAtFractionResult middleResult = assertSplitAtFractionBehaviorImpl(
-        source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction,
-        ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    if (middleResult.numResidualItems != -1) {
-      stats.successfulFractions.add(middleFraction);
-    }
-    if (middleResult.numResidualItems > 0) {
-      stats.nonTrivialFractions.add(middleFraction);
-    }
-    // Two split fractions are equivalent if they yield the same number of
-    // items in primary vs. residual source. Left and right are already not
-    // equivalent. Recurse into [left, middle) and [right, middle) respectively
-    // if middle is not equivalent to left or right.
-    if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
-      assertSplitAtFractionBinary(
-          source, expectedItems, numItemsToBeReadBeforeSplit,
-          leftFraction, leftResult, middleFraction, middleResult, options, stats);
-    }
-    if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
-      assertSplitAtFractionBinary(
-          source, expectedItems, numItemsToBeReadBeforeSplit,
-          middleFraction, middleResult, rightFraction, rightResult, options, stats);
-    }
-  }
-
-  /**
-   * Asserts that for each possible start position,
-   * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
-   * between two fractions that differ by at least one item) can be called successfully and the
-   * results are consistent if a split succeeds. Verifies multithreaded splitting as well.
-   */
-  public static <T> void assertSplitAtFractionExhaustive(
-      BoundedSource<T> source, PipelineOptions options) throws Exception {
-    List<T> expectedItems = readFromSource(source, options);
-    assertFalse("Empty source", expectedItems.isEmpty());
-    assertFalse("Source reads a single item", expectedItems.size() == 1);
-    List<List<Double>> allNonTrivialFractions = new ArrayList<>();
-    {
-      boolean anySuccessfulFractions = false;
-      boolean anyNonTrivialFractions = false;
-      for (int i = 0; i < expectedItems.size(); i++) {
-        SplitFractionStatistics stats = new SplitFractionStatistics();
-        assertSplitAtFractionBinary(source, expectedItems, i,
-            0.0, null, 1.0, null, options, stats);
-        if (!stats.successfulFractions.isEmpty()) {
-          anySuccessfulFractions = true;
-        }
-        if (!stats.nonTrivialFractions.isEmpty()) {
-          anyNonTrivialFractions = true;
-        }
-        allNonTrivialFractions.add(stats.nonTrivialFractions);
-      }
-      assertTrue(
-          "splitAtFraction test completed vacuously: no successful split fractions found",
-          anySuccessfulFractions);
-      assertTrue(
-          "splitAtFraction test completed vacuously: no non-trivial split fractions found",
-          anyNonTrivialFractions);
-    }
-    {
-      // Perform a stress test of "racy" concurrent splitting:
-      // for every position (number of items read), try to split at the minimum nontrivial
-      // split fraction for that position concurrently with reading the record at that position.
-      // To ensure that the test is non-vacuous, make sure that the splitting succeeds
-      // at least once and fails at least once.
-      ExecutorService executor = Executors.newFixedThreadPool(2);
-      for (int i = 0; i < expectedItems.size(); i++) {
-        double minNonTrivialFraction = 2.0;  // Greater than any possible fraction.
-        for (double fraction : allNonTrivialFractions.get(i)) {
-          minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
-        }
-        if (minNonTrivialFraction == 2.0) {
-          // This will not happen all the time because otherwise the test above would
-          // detect vacuousness.
-          continue;
-        }
-        boolean haveSuccess = false, haveFailure = false;
-        while (!haveSuccess || !haveFailure) {
-          if (assertSplitAtFractionConcurrent(
-              executor, source, expectedItems, i, minNonTrivialFraction, options)) {
-            haveSuccess = true;
-          } else {
-            haveFailure = true;
-          }
-        }
-      }
-    }
-  }
-
-  private static <T> boolean assertSplitAtFractionConcurrent(
-      ExecutorService executor, BoundedSource<T> source, List<T> expectedItems,
-      final int numItemsToReadBeforeSplitting, final double fraction, PipelineOptions options)
-      throws Exception {
-    @SuppressWarnings("resource")  // Closed in readerThread
-    final BoundedSource.BoundedReader<T> reader = source.createReader(options);
-    final CountDownLatch unblockSplitter = new CountDownLatch(1);
-    Future<List<T>> readerThread =
-        executor.submit(
-            new Callable<List<T>>() {
-              @Override
-              public List<T> call() throws Exception {
-                try {
-                  List<T> items =
-                      readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);
-                  unblockSplitter.countDown();
-                  items.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));
-                  return items;
-                } finally {
-                  reader.close();
-                }
-              }
-            });
-    Future<KV<BoundedSource<T>, BoundedSource<T>>> splitterThread = executor.submit(
-        new Callable<KV<BoundedSource<T>, BoundedSource<T>>>() {
-          @Override
-          public KV<BoundedSource<T>, BoundedSource<T>> call() throws Exception {
-            unblockSplitter.await();
-            BoundedSource<T> residual = reader.splitAtFraction(fraction);
-            if (residual == null) {
-              return null;
-            }
-            return KV.of(reader.getCurrentSource(), residual);
-          }
-        });
-    List<T> currentItems = readerThread.get();
-    KV<BoundedSource<T>, BoundedSource<T>> splitSources = splitterThread.get();
-    if (splitSources == null) {
-      return false;
-    }
-    SplitAtFractionResult res = verifySingleSplitAtFractionResult(
-        source, expectedItems, currentItems, splitSources.getKey(), splitSources.getValue(),
-        numItemsToReadBeforeSplitting, fraction, options);
-    return (res.numResidualItems > 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 1afb691..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index 9fff070..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowPipelineRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
-    this.options = options;
-    this.runner = DataflowPipelineRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-
-    return new TestDataflowPipelineRunner(dataflowOptions);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
-    final JobMessagesHandler messageHandler =
-        new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
-    final DataflowPipelineJob job;
-    try {
-      job = runner.run(pipeline);
-    } catch (DataflowJobExecutionException ex) {
-      throw new IllegalStateException("The dataflow failed.");
-    }
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    try {
-      final Optional<Boolean> result;
-      if (options.isStreaming()) {
-        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
-            new Callable<Optional<Boolean>>() {
-          @Override
-          public Optional<Boolean> call() throws Exception {
-            try {
-              for (;;) {
-                Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent()) {
-                  return result;
-                }
-                Thread.sleep(10000L);
-              }
-            } finally {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-          }
-        });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
-            @Override
-            public void process(List<JobMessage> messages) {
-              messageHandler.process(messages);
-              for (JobMessage message : messages) {
-                if (message.getMessageImportance() != null
-                    && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-                  LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
-                      job.getJobId(), message.getMessageText());
-                  try {
-                    job.cancel();
-                  } catch (Exception e) {
-                    throw Throwables.propagate(e);
-                  }
-                }
-              }
-            }
-          });
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
-              job.getJobId());
-          job.cancel();
-        }
-        result = resultFuture.get();
-      } else {
-        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
-        result = checkForSuccess(job);
-      }
-      if (!result.isPresent()) {
-        throw new IllegalStateException(
-            "The dataflow did not output a success or failure metric.");
-      } else if (!result.get()) {
-        throw new IllegalStateException("The dataflow failed.");
-      }
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw Throwables.propagate(e);
-    }
-    return job;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof DataflowAssert.OneSideInputAssert
-        || transform instanceof DataflowAssert.TwoSideInputAssert) {
-      expectedNumberOfAssertions += 1;
-    }
-
-    return runner.apply(transform, input);
-  }
-
-  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
-      throws IOException {
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline failed");
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = job.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      int successes = 0;
-      int failures = 0;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null || metric.getName().getContext() == null
-            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-          // Don't double count using the non-tentative version of the metric.
-          continue;
-        }
-        if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-          successes += ((BigDecimal) metric.getScalar()).intValue();
-        } else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-          failures += ((BigDecimal) metric.getScalar()).intValue();
-        }
-      }
-
-      if (failures > 0) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(true);
-      }
-
-      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
-          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
-    }
-
-    return Optional.<Boolean>absent();
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowPipelineRunner#" + options.getAppName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
deleted file mode 100644
index a05a778..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
-/**
- * A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against the live service.
- *
- * <p>It is recommended to tag hand-selected tests for this purpose using the
- * RunnableOnService Category annotation, as each test run against the service
- * will spin up and tear down a single VM.
- *
- * <p>In order to run tests on the dataflow pipeline service, the following
- * conditions must be met:
- * <ul>
- * <li> runIntegrationTestOnService System property must be set to true.
- * <li> System property "projectName" must be set to your Cloud project.
- * <li> System property "temp_gcs_directory" must be set to a valid GCS bucket.
- * <li> Jars containing the SDK and test classes must be added to the test classpath.
- * </ul>
- *
- * <p>Use {@link DataflowAssert} for tests, as it integrates with this test
- * harness in both direct and remote execution modes.  For example:
- *
- * <pre>{@code
- * Pipeline p = TestPipeline.create();
- * PCollection<Integer> output = ...
- *
- * DataflowAssert.that(output)
- *     .containsInAnyOrder(1, 2, 3, 4);
- * p.run();
- * }</pre>
- *
- */
-public class TestPipeline extends Pipeline {
-  private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions";
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  /**
-   * Creates and returns a new test pipeline.
-   *
-   * <p>Use {@link DataflowAssert} to add tests, then call
-   * {@link Pipeline#run} to execute the pipeline and check the tests.
-   */
-  public static TestPipeline create() {
-    return fromOptions(testingPipelineOptions());
-  }
-
-  public static TestPipeline fromOptions(PipelineOptions options) {
-    return new TestPipeline(PipelineRunner.fromOptions(options), options);
-  }
-
-  /**
-   * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests
-   * of dynamic work rebalancing are expected to pass.
-   */
-  public boolean supportsDynamicWorkRebalancing() {
-    return getRunner() instanceof DataflowPipelineRunner;
-  }
-
-  private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
-    super(runner, options);
-  }
-
-  /**
-   * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError}
-   * that is raised during testing.
-   */
-  @Override
-  public PipelineResult run() {
-    try {
-      return super.run();
-    } catch (RuntimeException exc) {
-      Throwable cause = exc.getCause();
-      if (cause instanceof AssertionError) {
-        throw (AssertionError) cause;
-      } else {
-        throw exc;
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
-  }
-
-  /**
-   * Creates {@link PipelineOptions} for testing.
-   */
-  public static PipelineOptions testingPipelineOptions() {
-    try {
-      @Nullable String systemDataflowOptions = System.getProperty(PROPERTY_DATAFLOW_OPTIONS);
-      PipelineOptions options =
-          systemDataflowOptions == null
-              ? PipelineOptionsFactory.create()
-              : PipelineOptionsFactory.fromArgs(
-                      MAPPER.readValue(
-                          System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class))
-                  .as(PipelineOptions.class);
-
-      options.as(ApplicationNameOptions.class).setAppName(getAppName());
-      if (isIntegrationTest()) {
-        // TODO: adjust everyone's integration test frameworks to set the runner class via the
-        // pipeline options via PROPERTY_DATAFLOW_OPTIONS
-        options.setRunner(TestDataflowPipelineRunner.class);
-      } else {
-        options.as(GcpOptions.class).setGcpCredential(new TestCredential());
-      }
-      options.setStableUniqueNames(CheckEnabled.ERROR);
-      return options;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to instantiate test options from system property "
-          + PROPERTY_DATAFLOW_OPTIONS + ":" + System.getProperty(PROPERTY_DATAFLOW_OPTIONS), e);
-    }
-  }
-
-  /**
-   * Returns whether a {@link TestPipeline} should be treated as an integration test.
-   */
-  private static boolean isIntegrationTest() {
-    return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService"));
-  }
-
-  /** Returns the class + method name of the test, or a default name. */
-  private static String getAppName() {
-    Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();
-    if (stackTraceElement.isPresent()) {
-      String methodName = stackTraceElement.get().getMethodName();
-      String className = stackTraceElement.get().getClassName();
-      if (className.contains(".")) {
-        className = className.substring(className.lastIndexOf(".") + 1);
-      }
-      return className + "-" + methodName;
-    }
-    return "UnitTest";
-  }
-
-  /** Returns the {@link StackTraceElement} of the calling class. */
-  private static Optional<StackTraceElement> findCallersStackTrace() {
-    Iterator<StackTraceElement> elements =
-        Iterators.forArray(Thread.currentThread().getStackTrace());
-    // First find the TestPipeline class in the stack trace.
-    while (elements.hasNext()) {
-      StackTraceElement next = elements.next();
-      if (TestPipeline.class.getName().equals(next.getClassName())) {
-        break;
-      }
-    }
-    // Then find the first instance after that is not the TestPipeline
-    while (elements.hasNext()) {
-      StackTraceElement next = elements.next();
-      if (!TestPipeline.class.getName().equals(next.getClassName())) {
-        return Optional.of(next);
-      }
-    }
-    return Optional.absent();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
deleted file mode 100644
index dc0baf5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import org.joda.time.Instant;
-import org.joda.time.ReadableInstant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * A utility class for testing {@link WindowFn}s.
- */
-public class WindowFnTestUtils {
-
-  /**
-   * Creates a Set of elements to be used as expected output in
-   * {@link #runWindowFn}.
-   */
-  public static Set<String> set(long... timestamps) {
-    Set<String> result = new HashSet<>();
-    for (long timestamp : timestamps) {
-      result.add(timestampValue(timestamp));
-    }
-    return result;
-  }
-
-  /**
-   * Runs the {@link WindowFn} over the provided input, returning a map
-   * of windows to the timestamps in those windows.
-   */
-  public static <T, W extends BoundedWindow> Map<W, Set<String>> runWindowFn(
-      WindowFn<T, W> windowFn,
-      List<Long> timestamps) throws Exception {
-
-    final TestWindowSet<W, String> windowSet = new TestWindowSet<W, String>();
-    for (final Long timestamp : timestamps) {
-      for (W window : windowFn.assignWindows(
-          new TestAssignContext<T, W>(new Instant(timestamp), windowFn))) {
-        windowSet.put(window, timestampValue(timestamp));
-      }
-      windowFn.mergeWindows(new TestMergeContext<T, W>(windowSet, windowFn));
-    }
-    Map<W, Set<String>> actual = new HashMap<>();
-    for (W window : windowSet.windows()) {
-      actual.put(window, windowSet.get(window));
-    }
-    return actual;
-  }
-
-  public static <T, W extends BoundedWindow> Collection<W> assignedWindows(
-      WindowFn<T, W> windowFn, long timestamp) throws Exception {
-    return windowFn.assignWindows(new TestAssignContext<T, W>(new Instant(timestamp), windowFn));
-  }
-
-  private static String timestampValue(long timestamp) {
-    return "T" + new Instant(timestamp);
-  }
-
-  /**
-   * Test implementation of AssignContext.
-   */
-  private static class TestAssignContext<T, W extends BoundedWindow>
-      extends WindowFn<T, W>.AssignContext {
-    private Instant timestamp;
-
-    public TestAssignContext(Instant timestamp, WindowFn<T, W> windowFn) {
-      windowFn.super();
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public T element() {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return null;
-    }
-  }
-
-  /**
-   * Test implementation of MergeContext.
-   */
-  private static class TestMergeContext<T, W extends BoundedWindow>
-    extends WindowFn<T, W>.MergeContext {
-    private TestWindowSet<W, ?> windowSet;
-
-    public TestMergeContext(
-        TestWindowSet<W, ?> windowSet, WindowFn<T, W> windowFn) {
-      windowFn.super();
-      this.windowSet = windowSet;
-    }
-
-    @Override
-    public Collection<W> windows() {
-      return windowSet.windows();
-    }
-
-    @Override
-    public void merge(Collection<W> toBeMerged, W mergeResult) {
-      windowSet.merge(toBeMerged, mergeResult);
-    }
-  }
-
-  /**
-   * A WindowSet useful for testing WindowFns that simply
-   * collects the placed elements into multisets.
-   */
-  private static class TestWindowSet<W extends BoundedWindow, V> {
-
-    private Map<W, Set<V>> elements = new HashMap<>();
-
-    public void put(W window, V value) {
-      Set<V> all = elements.get(window);
-      if (all == null) {
-        all = new HashSet<>();
-        elements.put(window, all);
-      }
-      all.add(value);
-    }
-
-    public void merge(Collection<W> otherWindows, W window) {
-      if (otherWindows.isEmpty()) {
-        return;
-      }
-      Set<V> merged = new HashSet<>();
-      if (elements.containsKey(window) && !otherWindows.contains(window)) {
-        merged.addAll(elements.get(window));
-      }
-      for (W w : otherWindows) {
-        if (!elements.containsKey(w)) {
-          throw new IllegalArgumentException("Tried to merge a non-existent window:" + w);
-        }
-        merged.addAll(elements.get(w));
-        elements.remove(w);
-      }
-      elements.put(window, merged);
-    }
-
-    public Collection<W> windows() {
-      return elements.keySet();
-    }
-
-    // For testing.
-
-    public Set<V> get(W window) {
-      return elements.get(window);
-    }
-  }
-
-  /**
-   * Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
-   * verifies that result of {@code windowFn.getOutputTimestamp} for each window is within the
-   * proper bound.
-   */
-  public static <T, W extends BoundedWindow> void validateNonInterferingOutputTimes(
-      WindowFn<T, W> windowFn, long timestamp) throws Exception {
-    Collection<W> windows = WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp);
-
-    Instant instant = new Instant(timestamp);
-    for (W window : windows) {
-      Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
-      assertFalse("getOutputTime must be greater than or equal to input timestamp",
-          outputTimestamp.isBefore(instant));
-      assertFalse("getOutputTime must be less than or equal to the max timestamp",
-          outputTimestamp.isAfter(window.maxTimestamp()));
-    }
-  }
-
-  /**
-   * Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
-   * verifies that result of {@link WindowFn#getOutputTime windowFn.getOutputTime} for later windows
-   * (as defined by {@code maxTimestamp} won't prevent the watermark from passing the end of earlier
-   * windows.
-   *
-   * <p>This verifies that overlapping windows don't interfere at all. Depending on the
-   * {@code windowFn} this may be stricter than desired.
-   */
-  public static <T, W extends BoundedWindow> void validateGetOutputTimestamp(
-      WindowFn<T, W> windowFn, long timestamp) throws Exception {
-    Collection<W> windows = WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp);
-    List<W> sortedWindows = new ArrayList<>(windows);
-    Collections.sort(sortedWindows, new Comparator<BoundedWindow>() {
-      @Override
-      public int compare(BoundedWindow o1, BoundedWindow o2) {
-        return o1.maxTimestamp().compareTo(o2.maxTimestamp());
-      }
-    });
-
-    Instant instant = new Instant(timestamp);
-    Instant endOfPrevious = null;
-    for (W window : sortedWindows) {
-      Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
-      if (endOfPrevious == null) {
-        // If this is the first window, the output timestamp can be anything, as long as it is in
-        // the valid range.
-        assertFalse("getOutputTime must be greater than or equal to input timestamp",
-            outputTimestamp.isBefore(instant));
-        assertFalse("getOutputTime must be less than or equal to the max timestamp",
-            outputTimestamp.isAfter(window.maxTimestamp()));
-      } else {
-        // If this is a later window, the output timestamp must be after the end of the previous
-        // window
-        assertTrue("getOutputTime must be greater than the end of the previous window",
-            outputTimestamp.isAfter(endOfPrevious));
-        assertFalse("getOutputTime must be less than or equal to the max timestamp",
-            outputTimestamp.isAfter(window.maxTimestamp()));
-      }
-      endOfPrevious = window.maxTimestamp();
-    }
-  }
-
-  /**
-   * Verifies that later-ending merged windows from any of the timestamps hold up output of
-   * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
-   *
-   * <p>Given a list of lists of timestamps, where each list is expected to merge into a single
-   * window with end times in ascending order, assigns and merges windows for each list (as though
-   * each were a separate key/user session). Then maps each timestamp in the list according to
-   * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
-   * {@link OutputTimeFn#combine outputTimeFn.combine()}.
-   *
-   * <p>Verifies that a overlapping windows do not hold each other up via the watermark.
-   */
-  public static <T, W extends IntervalWindow>
-  void validateGetOutputTimestamps(
-      WindowFn<T, W> windowFn,
-      OutputTimeFn<? super W> outputTimeFn,
-      List<List<Long>> timestampsPerWindow) throws Exception {
-
-    // Assign windows to each timestamp, then merge them, storing the merged windows in
-    // a list in corresponding order to timestampsPerWindow
-    final List<W> windows = new ArrayList<>();
-    for (List<Long> timestampsForWindow : timestampsPerWindow) {
-      final Set<W> windowsToMerge = new HashSet<>();
-
-      for (long timestamp : timestampsForWindow) {
-        windowsToMerge.addAll(
-            WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp));
-      }
-
-      windowFn.mergeWindows(windowFn.new MergeContext() {
-        @Override
-        public Collection<W> windows() {
-          return windowsToMerge;
-        }
-
-        @Override
-        public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
-          windows.add(mergeResult);
-        }
-      });
-    }
-
-    // Map every list of input timestamps to an output timestamp
-    final List<Instant> combinedOutputTimestamps = new ArrayList<>();
-    for (int i = 0; i < timestampsPerWindow.size(); ++i) {
-      List<Long> timestampsForWindow = timestampsPerWindow.get(i);
-      W window = windows.get(i);
-
-      List<Instant> outputInstants = new ArrayList<>();
-      for (long inputTimestamp : timestampsForWindow) {
-        outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
-      }
-
-      combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
-    }
-
-    // Consider windows in increasing order of max timestamp; ensure the output timestamp is after
-    // the max timestamp of the previous
-    @Nullable W earlierEndingWindow = null;
-    for (int i = 0; i < windows.size(); ++i) {
-      W window = windows.get(i);
-      ReadableInstant outputTimestamp = combinedOutputTimestamps.get(i);
-
-      if (earlierEndingWindow != null) {
-        assertThat(outputTimestamp,
-            greaterThan((ReadableInstant) earlierEndingWindow.maxTimestamp()));
-      }
-
-      earlierEndingWindow = window;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
deleted file mode 100644
index d6f075d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines utilities for unit testing Dataflow pipelines. The tests for the {@code PTransform}s and
- * examples included the Dataflow SDK provide examples of using these utilities.
- */
-package com.google.cloud.dataflow.sdk.testing;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
deleted file mode 100644
index 7e56dda..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
- * to be combined across all bundles.
- *
- * <p>Aggregators are created by calling {@link DoFn#createAggregator},
- * typically from the {@link DoFn} constructor. Elements can be added to the
- * {@code Aggregator} by calling {@link Aggregator#addValue}.
- *
- * <p>Aggregators are visible in the monitoring UI, when the pipeline is run
- * using DataflowPipelineRunner or BlockingDataflowPipelineRunner, along with
- * their current value. Aggregators may not become visible until the system
- * begins executing the ParDo transform that created them and/or their initial
- * value is changed.
- *
- * <p>Example:
- * <pre> {@code
- * class MyDoFn extends DoFn<String, String> {
- *   private Aggregator<Integer, Integer> myAggregator;
- *
- *   public MyDoFn() {
- *     myAggregator = createAggregator("myAggregator", new Sum.SumIntegerFn());
- *   }
- *
- *   @Override
- *   public void processElement(ProcessContext c) {
- *     myAggregator.addValue(1);
- *   }
- * }
- * } </pre>
- *
- * @param <InputT> the type of input values
- * @param <OutputT> the type of output values
- */
-public interface Aggregator<InputT, OutputT> {
-
-  /**
-   * Adds a new value into the Aggregator.
-   */
-  void addValue(InputT value);
-
-  /**
-   * Returns the name of the Aggregator.
-   */
-  String getName();
-
-  /**
-   * Returns the {@link CombineFn}, which combines input elements in the
-   * aggregator.
-   */
-  CombineFn<InputT, ?, OutputT> getCombineFn();
-
-  // TODO: Consider the following additional API conveniences:
-  // - In addition to createAggregator(), consider adding getAggregator() to
-  //   avoid the need to store the aggregator locally in a DoFn, i.e., create
-  //   if not already present.
-  // - Add a shortcut for the most common aggregator:
-  //   c.createAggregator("name", new Sum.SumIntegerFn()).
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
deleted file mode 100644
index 4bbea85..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import java.util.Collection;
-
-/**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
- */
-public final class AggregatorRetriever {
-  private AggregatorRetriever() {
-    // do not instantiate
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
-   */
-  public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
-    return fn.getAggregators();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
deleted file mode 100644
index 7b3d87d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-
-/**
- * Represents the application of a {@link PTransform} to a specific input to produce
- * a specific output.
- *
- * <p>For internal use.
- *
- * @param <InputT> transform input type
- * @param <OutputT> transform output type
- * @param <TransformT> transform type
- */
-public class AppliedPTransform
-    <InputT extends PInput, OutputT extends POutput,
-     TransformT extends PTransform<? super InputT, OutputT>> {
-
-  private final String fullName;
-  private final InputT input;
-  private final OutputT output;
-  private final TransformT transform;
-
-  private AppliedPTransform(String fullName, InputT input, OutputT output, TransformT transform) {
-    this.input = input;
-    this.output = output;
-    this.transform = transform;
-    this.fullName = fullName;
-  }
-
-  public static <InputT extends PInput, OutputT extends POutput,
-                 TransformT extends PTransform<? super InputT, OutputT>>
-  AppliedPTransform<InputT, OutputT, TransformT> of(
-      String fullName, InputT input, OutputT output, TransformT transform) {
-    return new AppliedPTransform<InputT, OutputT, TransformT>(fullName, input, output, transform);
-  }
-
-  public String getFullName() {
-    return fullName;
-  }
-
-  public InputT getInput() {
-    return input;
-  }
-
-  public OutputT getOutput() {
-    return output;
-  }
-
-  public TransformT getTransform() {
-    return transform;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(getFullName(), getInput(), getOutput(), getTransform());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof AppliedPTransform) {
-      AppliedPTransform<?, ?, ?> that = (AppliedPTransform<?, ?, ?>) other;
-      return Objects.equal(this.getFullName(), that.getFullName())
-          && Objects.equal(this.getInput(), that.getInput())
-          && Objects.equal(this.getOutput(), that.getOutput())
-          && Objects.equal(this.getTransform(), that.getTransform());
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("fullName", getFullName())
-        .add("input", getInput())
-        .add("output", getOutput())
-        .add("transform", getTransform())
-        .toString();
-  }
-}