You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:51 UTC
[27/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
deleted file mode 100644
index b8f9b0b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Helper functions and test harnesses for checking correctness of {@link Source}
- * implementations.
- *
- * <p>Contains a few lightweight utilities (e.g. reading items from a source or a reader,
- * such as {@link #readFromSource} and {@link #readFromUnstartedReader}), as well as
- * heavyweight property testing and stress testing harnesses that help getting a large
- * amount of test coverage with few code. Most notable ones are:
- * <ul>
- * <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read
- * by the union of sources produced by {@link BoundedSource#splitIntoBundles}
- * is the same as data read by the original source.
- * <li>If your source implements dynamic work rebalancing, use the
- * {@code assertSplitAtFraction} family of functions - they test behavior of
- * {@link BoundedSource.BoundedReader#splitAtFraction}, in particular, that
- * various consistency properties are respected and the total set of data read
- * by the source is preserved when splits happen.
- * Use {@link #assertSplitAtFractionBehavior} to test individual cases
- * of {@code splitAtFraction} and use {@link #assertSplitAtFractionExhaustive}
- * as a heavy-weight stress test including concurrency. We strongly recommend to
- * use both.
- * </ul>
- * For example usages, see the unit tests of classes such as
- * {@link com.google.cloud.dataflow.sdk.io.AvroSource} or
- * {@link com.google.cloud.dataflow.sdk.io.XmlSource}.
- *
- * <p>Like {@link DataflowAssert}, requires JUnit and Hamcrest to be present in the classpath.
- */
-public class SourceTestUtils {
- // A wrapper around a value of type T that compares according to the structural
- // value provided by a Coder<T>, but prints both the original and structural value,
- // to help get good error messages from JUnit equality assertion failures and such.
- private static class ReadableStructuralValue<T> {
- private T originalValue;
- private Object structuralValue;
-
- public ReadableStructuralValue(T originalValue, Object structuralValue) {
- this.originalValue = originalValue;
- this.structuralValue = structuralValue;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(structuralValue);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof ReadableStructuralValue)) {
- return false;
- }
- return Objects.equals(structuralValue, ((ReadableStructuralValue) obj).structuralValue);
- }
-
- @Override
- public String toString() {
- return String.format("[%s (structural %s)]", originalValue, structuralValue);
- }
- }
-
- /**
- * Testing utilities below depend on standard assertions and matchers to compare elements read by
- * sources. In general the elements may not implement {@code equals}/{@code hashCode} properly,
- * however every source has a {@link Coder} and every {@code Coder} can
- * produce a {@link Coder#structuralValue} whose {@code equals}/{@code hashCode} is
- * consistent with equality of encoded format.
- * So we use this {@link Coder#structuralValue} to compare elements read by sources.
- */
- public static <T> List<ReadableStructuralValue<T>> createStructuralValues(
- Coder<T> coder, List<T> list)
- throws Exception {
- List<ReadableStructuralValue<T>> result = new ArrayList<>();
- for (T elem : list) {
- result.add(new ReadableStructuralValue<>(elem, coder.structuralValue(elem)));
- }
- return result;
- }
-
- /**
- * Reads all elements from the given {@link BoundedSource}.
- */
- public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options)
- throws IOException {
- try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
- return readFromUnstartedReader(reader);
- }
- }
-
- /**
- * Reads all elements from the given unstarted {@link Source.Reader}.
- */
- public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {
- return readRemainingFromReader(reader, false);
- }
-
- /**
- * Reads all elements from the given started {@link Source.Reader}.
- */
- public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {
- return readRemainingFromReader(reader, true);
- }
-
- /**
- * Read elements from a {@link Source.Reader} until n elements are read.
- */
- public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)
- throws IOException {
- return readNItemsFromReader(reader, n, false);
- }
-
- /**
- * Read elements from a {@link Source.Reader} that has already had {@link Source.Reader#start}
- * called on it, until n elements are read.
- */
- public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n)
- throws IOException {
- return readNItemsFromReader(reader, n, true);
- }
-
- /**
- * Read elements from a {@link Source.Reader} until n elements are read.
- *
- * <p>There must be at least n elements remaining in the reader, except for
- * the case when n is {@code Integer.MAX_VALUE}, which means "read all
- * remaining elements".
- */
- private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started)
- throws IOException {
- List<T> res = new ArrayList<>();
- for (int i = 0; i < n; i++) {
- boolean shouldStart = (i == 0 && !started);
- boolean more = shouldStart ? reader.start() : reader.advance();
- if (n != Integer.MAX_VALUE) {
- assertTrue(more);
- }
- if (!more) {
- break;
- }
- res.add(reader.getCurrent());
- }
- return res;
- }
-
- /**
- * Read all remaining elements from a {@link Source.Reader}.
- */
- public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started)
- throws IOException {
- return readNItemsFromReader(reader, Integer.MAX_VALUE, started);
- }
-
- /**
- * Given a reference {@code Source} and a list of {@code Source}s, assert that the union of
- * the records read from the list of sources is equal to the records read from the reference
- * source.
- */
- public static <T> void assertSourcesEqualReferenceSource(
- BoundedSource<T> referenceSource,
- List<? extends BoundedSource<T>> sources,
- PipelineOptions options)
- throws Exception {
- Coder<T> coder = referenceSource.getDefaultOutputCoder();
- List<T> referenceRecords = readFromSource(referenceSource, options);
- List<T> bundleRecords = new ArrayList<>();
- for (BoundedSource<T> source : sources) {
- assertThat(
- "Coder type for source "
- + source
- + " is not compatible with Coder type for referenceSource "
- + referenceSource,
- source.getDefaultOutputCoder(),
- equalTo(coder));
- List<T> elems = readFromSource(source, options);
- bundleRecords.addAll(elems);
- }
- List<ReadableStructuralValue<T>> bundleValues =
- createStructuralValues(coder, bundleRecords);
- List<ReadableStructuralValue<T>> referenceValues =
- createStructuralValues(coder, referenceRecords);
- assertThat(bundleValues, containsInAnyOrder(referenceValues.toArray()));
- }
-
- /**
- * Assert that a {@code Reader} returns a {@code Source} that, when read from, produces the same
- * records as the reader.
- */
- public static <T> void assertUnstartedReaderReadsSameAsItsSource(
- BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
- Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
- List<T> expected = readFromUnstartedReader(reader);
- List<T> actual = readFromSource(reader.getCurrentSource(), options);
- List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
- List<ReadableStructuralValue<T>> actualStructural = createStructuralValues(coder, actual);
- assertThat(actualStructural, containsInAnyOrder(expectedStructural.toArray()));
- }
-
- /**
- * Expected outcome of
- * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader#splitAtFraction}.
- */
- public enum ExpectedSplitOutcome {
- /**
- * The operation must succeed and the results must be consistent.
- */
- MUST_SUCCEED_AND_BE_CONSISTENT,
- /**
- * The operation must fail (return {@code null}).
- */
- MUST_FAIL,
- /**
- * The operation must either fail, or succeed and the results be consistent.
- */
- MUST_BE_CONSISTENT_IF_SUCCEEDS
- }
-
- /**
- * Contains two values: the number of items in the primary source, and the number of items in
- * the residual source, -1 if split failed.
- */
- private static class SplitAtFractionResult {
- public int numPrimaryItems;
- public int numResidualItems;
-
- public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {
- this.numPrimaryItems = numPrimaryItems;
- this.numResidualItems = numResidualItems;
- }
- }
-
- /**
- * Asserts that the {@code source}'s reader either fails to {@code splitAtFraction(fraction)}
- * after reading {@code numItemsToReadBeforeSplit} items, or succeeds in a way that is
- * consistent according to {@link #assertSplitAtFractionSucceedsAndConsistent}.
- * <p> Returns SplitAtFractionResult.
- */
-
- public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(
- BoundedSource<T> source,
- int numItemsToReadBeforeSplit,
- double splitFraction,
- ExpectedSplitOutcome expectedOutcome,
- PipelineOptions options)
- throws Exception {
- return assertSplitAtFractionBehaviorImpl(
- source, readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction,
- expectedOutcome, options);
- }
-
- /**
- * Compares two lists elementwise and throws a detailed assertion failure optimized for
- * human reading in case they are unequal.
- */
- private static <T> void assertListsEqualInOrder(
- String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {
- int i = 0;
- for (; i < expected.size() && i < actual.size(); ++i) {
- if (!Objects.equals(expected.get(i), actual.get(i))) {
- Assert.fail(String.format(
- "%s: %s and %s have %d items in common and then differ. "
- + "Item in %s (%d more): %s, item in %s (%d more): %s",
- message, expectedLabel, actualLabel, i,
- expectedLabel, expected.size() - i - 1, expected.get(i),
- actualLabel, actual.size() - i - 1, actual.get(i)));
- }
- }
- if (i < expected.size() /* but i == actual.size() */) {
- Assert.fail(String.format(
- "%s: %s has %d more items after matching all %d from %s. First 5: %s",
- message, expectedLabel, expected.size() - actual.size(), actual.size(), actualLabel,
- expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));
- } else if (i < actual.size() /* but i == expected.size() */) {
- Assert.fail(String.format(
- "%s: %s has %d more items after matching all %d from %s. First 5: %s",
- message, actualLabel, actual.size() - expected.size(), expected.size(), expectedLabel,
- actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));
- } else {
- // All is well.
- }
- }
-
- private static <T> SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl(
- BoundedSource<T> source, List<T> expectedItems, int numItemsToReadBeforeSplit,
- double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options)
- throws Exception {
- try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
- BoundedSource<T> originalSource = reader.getCurrentSource();
- List<T> currentItems = readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);
- BoundedSource<T> residual = reader.splitAtFraction(splitFraction);
- if (residual != null) {
- assertFalse(
- String.format(
- "Primary source didn't change after a successful split of %s at %f "
- + "after reading %d items. "
- + "Was the source object mutated instead of creating a new one? "
- + "Source objects MUST be immutable.",
- source, splitFraction, numItemsToReadBeforeSplit),
- reader.getCurrentSource() == originalSource);
- assertFalse(
- String.format(
- "Residual source equal to original source after a successful split of %s at %f "
- + "after reading %d items. "
- + "Was the source object mutated instead of creating a new one? "
- + "Source objects MUST be immutable.",
- source, splitFraction, numItemsToReadBeforeSplit),
- reader.getCurrentSource() == residual);
- }
- // Failure cases are: must succeed but fails; must fail but succeeds.
- switch (expectedOutcome) {
- case MUST_SUCCEED_AND_BE_CONSISTENT:
- assertNotNull(
- "Failed to split reader of source: "
- + source
- + " at "
- + splitFraction
- + " after reading "
- + numItemsToReadBeforeSplit
- + " items",
- residual);
- break;
- case MUST_FAIL:
- assertEquals(null, residual);
- break;
- case MUST_BE_CONSISTENT_IF_SUCCEEDS:
- // Nothing.
- break;
- }
- currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));
- BoundedSource<T> primary = reader.getCurrentSource();
- return verifySingleSplitAtFractionResult(
- source, expectedItems, currentItems, primary, residual,
- numItemsToReadBeforeSplit, splitFraction, options);
- }
- }
-
- private static <T> SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult(
- BoundedSource<T> source, List<T> expectedItems, List<T> currentItems,
- BoundedSource<T> primary, BoundedSource<T> residual,
- int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options)
- throws Exception {
- List<T> primaryItems = readFromSource(primary, options);
- if (residual != null) {
- List<T> residualItems = readFromSource(residual, options);
- List<T> totalItems = new ArrayList<>();
- totalItems.addAll(primaryItems);
- totalItems.addAll(residualItems);
- String errorMsgForPrimarySourceComp =
- String.format(
- "Continued reading after split yielded different items than primary source: "
- + "split at %s after reading %s items, original source: %s, primary source: %s",
- splitFraction,
- numItemsToReadBeforeSplit,
- source,
- primary);
- String errorMsgForTotalSourceComp =
- String.format(
- "Items in primary and residual sources after split do not add up to items "
- + "in the original source. Split at %s after reading %s items; "
- + "original source: %s, primary: %s, residual: %s",
- splitFraction,
- numItemsToReadBeforeSplit,
- source,
- primary,
- residual);
- Coder<T> coder = primary.getDefaultOutputCoder();
- List<ReadableStructuralValue<T>> primaryValues =
- createStructuralValues(coder, primaryItems);
- List<ReadableStructuralValue<T>> currentValues =
- createStructuralValues(coder, currentItems);
- List<ReadableStructuralValue<T>> expectedValues =
- createStructuralValues(coder, expectedItems);
- List<ReadableStructuralValue<T>> totalValues =
- createStructuralValues(coder, totalItems);
- assertListsEqualInOrder(
- errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);
- assertListsEqualInOrder(
- errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);
- return new SplitAtFractionResult(primaryItems.size(), residualItems.size());
- }
- return new SplitAtFractionResult(primaryItems.size(), -1);
- }
-
- /**
- * Verifies some consistency properties of
- * {@link BoundedSource.BoundedReader#splitAtFraction} on the given source. Equivalent to
- * the following pseudocode:
- * <pre>
- * Reader reader = source.createReader();
- * read N items from reader;
- * Source residual = reader.splitAtFraction(splitFraction);
- * Source primary = reader.getCurrentSource();
- * assert: items in primary == items we read so far
- * + items we'll get by continuing to read from reader;
- * assert: items in original source == items in primary + items in residual
- * </pre>
- */
- public static <T> void assertSplitAtFractionSucceedsAndConsistent(
- BoundedSource<T> source,
- int numItemsToReadBeforeSplit,
- double splitFraction,
- PipelineOptions options)
- throws Exception {
- assertSplitAtFractionBehavior(
- source,
- numItemsToReadBeforeSplit,
- splitFraction,
- ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT,
- options);
- }
-
- /**
- * Asserts that the {@code source}'s reader fails to {@code splitAtFraction(fraction)}
- * after reading {@code numItemsToReadBeforeSplit} items.
- */
- public static <T> void assertSplitAtFractionFails(
- BoundedSource<T> source,
- int numItemsToReadBeforeSplit,
- double splitFraction,
- PipelineOptions options)
- throws Exception {
- assertSplitAtFractionBehavior(
- source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);
- }
-
- private static class SplitFractionStatistics {
- List<Double> successfulFractions = new ArrayList<>();
- List<Double> nonTrivialFractions = new ArrayList<>();
- }
-
- /**
- * Asserts that given a start position,
- * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
- * between two fractions that differ by at least one item) can be called successfully and the
- * results are consistent if a split succeeds.
- */
- private static <T> void assertSplitAtFractionBinary(
- BoundedSource<T> source,
- List<T> expectedItems,
- int numItemsToBeReadBeforeSplit,
- double leftFraction,
- SplitAtFractionResult leftResult,
- double rightFraction,
- SplitAtFractionResult rightResult,
- PipelineOptions options,
- SplitFractionStatistics stats)
- throws Exception {
- if (rightFraction - leftFraction < 0.001) {
- // Do not recurse too deeply. Otherwise we will end up in infinite
- // recursion, e.g., while trying to find the exact minimal fraction s.t.
- // split succeeds. A precision of 0.001 when looking for such a fraction
- // ought to be enough for everybody.
- return;
- }
- double middleFraction = (rightFraction + leftFraction) / 2;
- if (leftResult == null) {
- leftResult = assertSplitAtFractionBehaviorImpl(
- source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction,
- ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- }
- if (rightResult == null) {
- rightResult = assertSplitAtFractionBehaviorImpl(
- source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction,
- ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- }
- SplitAtFractionResult middleResult = assertSplitAtFractionBehaviorImpl(
- source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction,
- ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- if (middleResult.numResidualItems != -1) {
- stats.successfulFractions.add(middleFraction);
- }
- if (middleResult.numResidualItems > 0) {
- stats.nonTrivialFractions.add(middleFraction);
- }
- // Two split fractions are equivalent if they yield the same number of
- // items in primary vs. residual source. Left and right are already not
- // equivalent. Recurse into [left, middle) and [right, middle) respectively
- // if middle is not equivalent to left or right.
- if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
- assertSplitAtFractionBinary(
- source, expectedItems, numItemsToBeReadBeforeSplit,
- leftFraction, leftResult, middleFraction, middleResult, options, stats);
- }
- if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
- assertSplitAtFractionBinary(
- source, expectedItems, numItemsToBeReadBeforeSplit,
- middleFraction, middleResult, rightFraction, rightResult, options, stats);
- }
- }
-
- /**
- * Asserts that for each possible start position,
- * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
- * between two fractions that differ by at least one item) can be called successfully and the
- * results are consistent if a split succeeds. Verifies multithreaded splitting as well.
- */
- public static <T> void assertSplitAtFractionExhaustive(
- BoundedSource<T> source, PipelineOptions options) throws Exception {
- List<T> expectedItems = readFromSource(source, options);
- assertFalse("Empty source", expectedItems.isEmpty());
- assertFalse("Source reads a single item", expectedItems.size() == 1);
- List<List<Double>> allNonTrivialFractions = new ArrayList<>();
- {
- boolean anySuccessfulFractions = false;
- boolean anyNonTrivialFractions = false;
- for (int i = 0; i < expectedItems.size(); i++) {
- SplitFractionStatistics stats = new SplitFractionStatistics();
- assertSplitAtFractionBinary(source, expectedItems, i,
- 0.0, null, 1.0, null, options, stats);
- if (!stats.successfulFractions.isEmpty()) {
- anySuccessfulFractions = true;
- }
- if (!stats.nonTrivialFractions.isEmpty()) {
- anyNonTrivialFractions = true;
- }
- allNonTrivialFractions.add(stats.nonTrivialFractions);
- }
- assertTrue(
- "splitAtFraction test completed vacuously: no successful split fractions found",
- anySuccessfulFractions);
- assertTrue(
- "splitAtFraction test completed vacuously: no non-trivial split fractions found",
- anyNonTrivialFractions);
- }
- {
- // Perform a stress test of "racy" concurrent splitting:
- // for every position (number of items read), try to split at the minimum nontrivial
- // split fraction for that position concurrently with reading the record at that position.
- // To ensure that the test is non-vacuous, make sure that the splitting succeeds
- // at least once and fails at least once.
- ExecutorService executor = Executors.newFixedThreadPool(2);
- for (int i = 0; i < expectedItems.size(); i++) {
- double minNonTrivialFraction = 2.0; // Greater than any possible fraction.
- for (double fraction : allNonTrivialFractions.get(i)) {
- minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
- }
- if (minNonTrivialFraction == 2.0) {
- // This will not happen all the time because otherwise the test above would
- // detect vacuousness.
- continue;
- }
- boolean haveSuccess = false, haveFailure = false;
- while (!haveSuccess || !haveFailure) {
- if (assertSplitAtFractionConcurrent(
- executor, source, expectedItems, i, minNonTrivialFraction, options)) {
- haveSuccess = true;
- } else {
- haveFailure = true;
- }
- }
- }
- }
- }
-
- private static <T> boolean assertSplitAtFractionConcurrent(
- ExecutorService executor, BoundedSource<T> source, List<T> expectedItems,
- final int numItemsToReadBeforeSplitting, final double fraction, PipelineOptions options)
- throws Exception {
- @SuppressWarnings("resource") // Closed in readerThread
- final BoundedSource.BoundedReader<T> reader = source.createReader(options);
- final CountDownLatch unblockSplitter = new CountDownLatch(1);
- Future<List<T>> readerThread =
- executor.submit(
- new Callable<List<T>>() {
- @Override
- public List<T> call() throws Exception {
- try {
- List<T> items =
- readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);
- unblockSplitter.countDown();
- items.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));
- return items;
- } finally {
- reader.close();
- }
- }
- });
- Future<KV<BoundedSource<T>, BoundedSource<T>>> splitterThread = executor.submit(
- new Callable<KV<BoundedSource<T>, BoundedSource<T>>>() {
- @Override
- public KV<BoundedSource<T>, BoundedSource<T>> call() throws Exception {
- unblockSplitter.await();
- BoundedSource<T> residual = reader.splitAtFraction(fraction);
- if (residual == null) {
- return null;
- }
- return KV.of(reader.getCurrentSource(), residual);
- }
- });
- List<T> currentItems = readerThread.get();
- KV<BoundedSource<T>, BoundedSource<T>> splitSources = splitterThread.get();
- if (splitSources == null) {
- return false;
- }
- SplitAtFractionResult res = verifySingleSplitAtFractionResult(
- source, expectedItems, currentItems, splitSources.getKey(), splitSources.getValue(),
- numItemsToReadBeforeSplitting, fraction, options);
- return (res.numResidualItems > 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 1afb691..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index 9fff070..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
- private static final String TENTATIVE_COUNTER = "tentative";
- private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
- private final TestDataflowPipelineOptions options;
- private final DataflowPipelineRunner runner;
- private int expectedNumberOfAssertions = 0;
-
- TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
- this.options = options;
- this.runner = DataflowPipelineRunner.fromOptions(options);
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static TestDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-
- return new TestDataflowPipelineRunner(dataflowOptions);
- }
-
- @Override
- public DataflowPipelineJob run(Pipeline pipeline) {
- return run(pipeline, runner);
- }
-
- DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
- final JobMessagesHandler messageHandler =
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
- final DataflowPipelineJob job;
- try {
- job = runner.run(pipeline);
- } catch (DataflowJobExecutionException ex) {
- throw new IllegalStateException("The dataflow failed.");
- }
-
- LOG.info("Running Dataflow job {} with {} expected assertions.",
- job.getJobId(), expectedNumberOfAssertions);
-
- try {
- final Optional<Boolean> result;
- if (options.isStreaming()) {
- Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
- new Callable<Optional<Boolean>>() {
- @Override
- public Optional<Boolean> call() throws Exception {
- try {
- for (;;) {
- Optional<Boolean> result = checkForSuccess(job);
- if (result.isPresent()) {
- return result;
- }
- Thread.sleep(10000L);
- }
- } finally {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- job.cancel();
- }
- }
- });
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
- job.getJobId(), message.getMessageText());
- try {
- job.cancel();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- }
- }
- });
- if (finalState == null || finalState == State.RUNNING) {
- LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
- job.getJobId());
- job.cancel();
- }
- result = resultFuture.get();
- } else {
- job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
- result = checkForSuccess(job);
- }
- if (!result.isPresent()) {
- throw new IllegalStateException(
- "The dataflow did not output a success or failure metric.");
- } else if (!result.get()) {
- throw new IllegalStateException("The dataflow failed.");
- }
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw Throwables.propagate(e);
- }
- return job;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (transform instanceof DataflowAssert.OneSideInputAssert
- || transform instanceof DataflowAssert.TwoSideInputAssert) {
- expectedNumberOfAssertions += 1;
- }
-
- return runner.apply(transform, input);
- }
-
- Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
- throws IOException {
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("The pipeline failed");
- return Optional.of(false);
- }
-
- JobMetrics metrics = job.getDataflowClient().projects().jobs()
- .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
- if (metrics == null || metrics.getMetrics() == null) {
- LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
- } else {
- int successes = 0;
- int failures = 0;
- for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null || metric.getName().getContext() == null
- || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
- // Don't double count using the non-tentative version of the metric.
- continue;
- }
- if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
- successes += ((BigDecimal) metric.getScalar()).intValue();
- } else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
- failures += ((BigDecimal) metric.getScalar()).intValue();
- }
- }
-
- if (failures > 0) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
- + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
- }
-
- return Optional.<Boolean>absent();
- }
-
- @Override
- public String toString() {
- return "TestDataflowPipelineRunner#" + options.getAppName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
deleted file mode 100644
index a05a778..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
-/**
- * A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against the live service.
- *
- * <p>It is recommended to tag hand-selected tests for this purpose using the
- * RunnableOnService Category annotation, as each test run against the service
- * will spin up and tear down a single VM.
- *
- * <p>In order to run tests on the dataflow pipeline service, the following
- * conditions must be met:
- * <ul>
- * <li> runIntegrationTestOnService System property must be set to true.
- * <li> System property "projectName" must be set to your Cloud project.
- * <li> System property "temp_gcs_directory" must be set to a valid GCS bucket.
- * <li> Jars containing the SDK and test classes must be added to the test classpath.
- * </ul>
- *
- * <p>Use {@link DataflowAssert} for tests, as it integrates with this test
- * harness in both direct and remote execution modes. For example:
- *
- * <pre>{@code
- * Pipeline p = TestPipeline.create();
- * PCollection<Integer> output = ...
- *
- * DataflowAssert.that(output)
- * .containsInAnyOrder(1, 2, 3, 4);
- * p.run();
- * }</pre>
- *
- */
-public class TestPipeline extends Pipeline {
- private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions";
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- /**
- * Creates and returns a new test pipeline.
- *
- * <p>Use {@link DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- */
- public static TestPipeline create() {
- return fromOptions(testingPipelineOptions());
- }
-
- public static TestPipeline fromOptions(PipelineOptions options) {
- return new TestPipeline(PipelineRunner.fromOptions(options), options);
- }
-
- /**
- * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests
- * of dynamic work rebalancing are expected to pass.
- */
- public boolean supportsDynamicWorkRebalancing() {
- return getRunner() instanceof DataflowPipelineRunner;
- }
-
- private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
- super(runner, options);
- }
-
- /**
- * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError}
- * that is raised during testing.
- */
- @Override
- public PipelineResult run() {
- try {
- return super.run();
- } catch (RuntimeException exc) {
- Throwable cause = exc.getCause();
- if (cause instanceof AssertionError) {
- throw (AssertionError) cause;
- } else {
- throw exc;
- }
- }
- }
-
- @Override
- public String toString() {
- return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
- }
-
- /**
- * Creates {@link PipelineOptions} for testing.
- */
- public static PipelineOptions testingPipelineOptions() {
- try {
- @Nullable String systemDataflowOptions = System.getProperty(PROPERTY_DATAFLOW_OPTIONS);
- PipelineOptions options =
- systemDataflowOptions == null
- ? PipelineOptionsFactory.create()
- : PipelineOptionsFactory.fromArgs(
- MAPPER.readValue(
- System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class))
- .as(PipelineOptions.class);
-
- options.as(ApplicationNameOptions.class).setAppName(getAppName());
- if (isIntegrationTest()) {
- // TODO: adjust everyone's integration test frameworks to set the runner class via the
- // pipeline options via PROPERTY_DATAFLOW_OPTIONS
- options.setRunner(TestDataflowPipelineRunner.class);
- } else {
- options.as(GcpOptions.class).setGcpCredential(new TestCredential());
- }
- options.setStableUniqueNames(CheckEnabled.ERROR);
- return options;
- } catch (IOException e) {
- throw new RuntimeException("Unable to instantiate test options from system property "
- + PROPERTY_DATAFLOW_OPTIONS + ":" + System.getProperty(PROPERTY_DATAFLOW_OPTIONS), e);
- }
- }
-
- /**
- * Returns whether a {@link TestPipeline} should be treated as an integration test.
- */
- private static boolean isIntegrationTest() {
- return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService"));
- }
-
- /** Returns the class + method name of the test, or a default name. */
- private static String getAppName() {
- Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();
- if (stackTraceElement.isPresent()) {
- String methodName = stackTraceElement.get().getMethodName();
- String className = stackTraceElement.get().getClassName();
- if (className.contains(".")) {
- className = className.substring(className.lastIndexOf(".") + 1);
- }
- return className + "-" + methodName;
- }
- return "UnitTest";
- }
-
- /** Returns the {@link StackTraceElement} of the calling class. */
- private static Optional<StackTraceElement> findCallersStackTrace() {
- Iterator<StackTraceElement> elements =
- Iterators.forArray(Thread.currentThread().getStackTrace());
- // First find the TestPipeline class in the stack trace.
- while (elements.hasNext()) {
- StackTraceElement next = elements.next();
- if (TestPipeline.class.getName().equals(next.getClassName())) {
- break;
- }
- }
- // Then find the first instance after that is not the TestPipeline
- while (elements.hasNext()) {
- StackTraceElement next = elements.next();
- if (!TestPipeline.class.getName().equals(next.getClassName())) {
- return Optional.of(next);
- }
- }
- return Optional.absent();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
deleted file mode 100644
index dc0baf5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/WindowFnTestUtils.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import org.joda.time.Instant;
-import org.joda.time.ReadableInstant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * A utility class for testing {@link WindowFn}s.
- */
-public class WindowFnTestUtils {
-
- /**
- * Creates a Set of elements to be used as expected output in
- * {@link #runWindowFn}.
- */
- public static Set<String> set(long... timestamps) {
- Set<String> result = new HashSet<>();
- for (long timestamp : timestamps) {
- result.add(timestampValue(timestamp));
- }
- return result;
- }
-
- /**
- * Runs the {@link WindowFn} over the provided input, returning a map
- * of windows to the timestamps in those windows.
- */
- public static <T, W extends BoundedWindow> Map<W, Set<String>> runWindowFn(
- WindowFn<T, W> windowFn,
- List<Long> timestamps) throws Exception {
-
- final TestWindowSet<W, String> windowSet = new TestWindowSet<W, String>();
- for (final Long timestamp : timestamps) {
- for (W window : windowFn.assignWindows(
- new TestAssignContext<T, W>(new Instant(timestamp), windowFn))) {
- windowSet.put(window, timestampValue(timestamp));
- }
- windowFn.mergeWindows(new TestMergeContext<T, W>(windowSet, windowFn));
- }
- Map<W, Set<String>> actual = new HashMap<>();
- for (W window : windowSet.windows()) {
- actual.put(window, windowSet.get(window));
- }
- return actual;
- }
-
- public static <T, W extends BoundedWindow> Collection<W> assignedWindows(
- WindowFn<T, W> windowFn, long timestamp) throws Exception {
- return windowFn.assignWindows(new TestAssignContext<T, W>(new Instant(timestamp), windowFn));
- }
-
- private static String timestampValue(long timestamp) {
- return "T" + new Instant(timestamp);
- }
-
- /**
- * Test implementation of AssignContext.
- */
- private static class TestAssignContext<T, W extends BoundedWindow>
- extends WindowFn<T, W>.AssignContext {
- private Instant timestamp;
-
- public TestAssignContext(Instant timestamp, WindowFn<T, W> windowFn) {
- windowFn.super();
- this.timestamp = timestamp;
- }
-
- @Override
- public T element() {
- return null;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return null;
- }
- }
-
- /**
- * Test implementation of MergeContext.
- */
- private static class TestMergeContext<T, W extends BoundedWindow>
- extends WindowFn<T, W>.MergeContext {
- private TestWindowSet<W, ?> windowSet;
-
- public TestMergeContext(
- TestWindowSet<W, ?> windowSet, WindowFn<T, W> windowFn) {
- windowFn.super();
- this.windowSet = windowSet;
- }
-
- @Override
- public Collection<W> windows() {
- return windowSet.windows();
- }
-
- @Override
- public void merge(Collection<W> toBeMerged, W mergeResult) {
- windowSet.merge(toBeMerged, mergeResult);
- }
- }
-
- /**
- * A WindowSet useful for testing WindowFns that simply
- * collects the placed elements into multisets.
- */
- private static class TestWindowSet<W extends BoundedWindow, V> {
-
- private Map<W, Set<V>> elements = new HashMap<>();
-
- public void put(W window, V value) {
- Set<V> all = elements.get(window);
- if (all == null) {
- all = new HashSet<>();
- elements.put(window, all);
- }
- all.add(value);
- }
-
- public void merge(Collection<W> otherWindows, W window) {
- if (otherWindows.isEmpty()) {
- return;
- }
- Set<V> merged = new HashSet<>();
- if (elements.containsKey(window) && !otherWindows.contains(window)) {
- merged.addAll(elements.get(window));
- }
- for (W w : otherWindows) {
- if (!elements.containsKey(w)) {
- throw new IllegalArgumentException("Tried to merge a non-existent window:" + w);
- }
- merged.addAll(elements.get(w));
- elements.remove(w);
- }
- elements.put(window, merged);
- }
-
- public Collection<W> windows() {
- return elements.keySet();
- }
-
- // For testing.
-
- public Set<V> get(W window) {
- return elements.get(window);
- }
- }
-
- /**
- * Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
- * verifies that result of {@code windowFn.getOutputTimestamp} for each window is within the
- * proper bound.
- */
- public static <T, W extends BoundedWindow> void validateNonInterferingOutputTimes(
- WindowFn<T, W> windowFn, long timestamp) throws Exception {
- Collection<W> windows = WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp);
-
- Instant instant = new Instant(timestamp);
- for (W window : windows) {
- Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
- assertFalse("getOutputTime must be greater than or equal to input timestamp",
- outputTimestamp.isBefore(instant));
- assertFalse("getOutputTime must be less than or equal to the max timestamp",
- outputTimestamp.isAfter(window.maxTimestamp()));
- }
- }
-
- /**
- * Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
- * verifies that result of {@link WindowFn#getOutputTime windowFn.getOutputTime} for later windows
- * (as defined by {@code maxTimestamp} won't prevent the watermark from passing the end of earlier
- * windows.
- *
- * <p>This verifies that overlapping windows don't interfere at all. Depending on the
- * {@code windowFn} this may be stricter than desired.
- */
- public static <T, W extends BoundedWindow> void validateGetOutputTimestamp(
- WindowFn<T, W> windowFn, long timestamp) throws Exception {
- Collection<W> windows = WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp);
- List<W> sortedWindows = new ArrayList<>(windows);
- Collections.sort(sortedWindows, new Comparator<BoundedWindow>() {
- @Override
- public int compare(BoundedWindow o1, BoundedWindow o2) {
- return o1.maxTimestamp().compareTo(o2.maxTimestamp());
- }
- });
-
- Instant instant = new Instant(timestamp);
- Instant endOfPrevious = null;
- for (W window : sortedWindows) {
- Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
- if (endOfPrevious == null) {
- // If this is the first window, the output timestamp can be anything, as long as it is in
- // the valid range.
- assertFalse("getOutputTime must be greater than or equal to input timestamp",
- outputTimestamp.isBefore(instant));
- assertFalse("getOutputTime must be less than or equal to the max timestamp",
- outputTimestamp.isAfter(window.maxTimestamp()));
- } else {
- // If this is a later window, the output timestamp must be after the end of the previous
- // window
- assertTrue("getOutputTime must be greater than the end of the previous window",
- outputTimestamp.isAfter(endOfPrevious));
- assertFalse("getOutputTime must be less than or equal to the max timestamp",
- outputTimestamp.isAfter(window.maxTimestamp()));
- }
- endOfPrevious = window.maxTimestamp();
- }
- }
-
- /**
- * Verifies that later-ending merged windows from any of the timestamps hold up output of
- * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
- *
- * <p>Given a list of lists of timestamps, where each list is expected to merge into a single
- * window with end times in ascending order, assigns and merges windows for each list (as though
- * each were a separate key/user session). Then maps each timestamp in the list according to
- * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
- * {@link OutputTimeFn#combine outputTimeFn.combine()}.
- *
- * <p>Verifies that a overlapping windows do not hold each other up via the watermark.
- */
- public static <T, W extends IntervalWindow>
- void validateGetOutputTimestamps(
- WindowFn<T, W> windowFn,
- OutputTimeFn<? super W> outputTimeFn,
- List<List<Long>> timestampsPerWindow) throws Exception {
-
- // Assign windows to each timestamp, then merge them, storing the merged windows in
- // a list in corresponding order to timestampsPerWindow
- final List<W> windows = new ArrayList<>();
- for (List<Long> timestampsForWindow : timestampsPerWindow) {
- final Set<W> windowsToMerge = new HashSet<>();
-
- for (long timestamp : timestampsForWindow) {
- windowsToMerge.addAll(
- WindowFnTestUtils.<T, W>assignedWindows(windowFn, timestamp));
- }
-
- windowFn.mergeWindows(windowFn.new MergeContext() {
- @Override
- public Collection<W> windows() {
- return windowsToMerge;
- }
-
- @Override
- public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- windows.add(mergeResult);
- }
- });
- }
-
- // Map every list of input timestamps to an output timestamp
- final List<Instant> combinedOutputTimestamps = new ArrayList<>();
- for (int i = 0; i < timestampsPerWindow.size(); ++i) {
- List<Long> timestampsForWindow = timestampsPerWindow.get(i);
- W window = windows.get(i);
-
- List<Instant> outputInstants = new ArrayList<>();
- for (long inputTimestamp : timestampsForWindow) {
- outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
- }
-
- combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
- }
-
- // Consider windows in increasing order of max timestamp; ensure the output timestamp is after
- // the max timestamp of the previous
- @Nullable W earlierEndingWindow = null;
- for (int i = 0; i < windows.size(); ++i) {
- W window = windows.get(i);
- ReadableInstant outputTimestamp = combinedOutputTimestamps.get(i);
-
- if (earlierEndingWindow != null) {
- assertThat(outputTimestamp,
- greaterThan((ReadableInstant) earlierEndingWindow.maxTimestamp()));
- }
-
- earlierEndingWindow = window;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
deleted file mode 100644
index d6f075d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines utilities for unit testing Dataflow pipelines. The tests for the {@code PTransform}s and
- * examples included the Dataflow SDK provide examples of using these utilities.
- */
-package com.google.cloud.dataflow.sdk.testing;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
deleted file mode 100644
index 7e56dda..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
- * to be combined across all bundles.
- *
- * <p>Aggregators are created by calling {@link DoFn#createAggregator},
- * typically from the {@link DoFn} constructor. Elements can be added to the
- * {@code Aggregator} by calling {@link Aggregator#addValue}.
- *
- * <p>Aggregators are visible in the monitoring UI, when the pipeline is run
- * using DataflowPipelineRunner or BlockingDataflowPipelineRunner, along with
- * their current value. Aggregators may not become visible until the system
- * begins executing the ParDo transform that created them and/or their initial
- * value is changed.
- *
- * <p>Example:
- * <pre> {@code
- * class MyDoFn extends DoFn<String, String> {
- * private Aggregator<Integer, Integer> myAggregator;
- *
- * public MyDoFn() {
- * myAggregator = createAggregator("myAggregator", new Sum.SumIntegerFn());
- * }
- *
- * @Override
- * public void processElement(ProcessContext c) {
- * myAggregator.addValue(1);
- * }
- * }
- * } </pre>
- *
- * @param <InputT> the type of input values
- * @param <OutputT> the type of output values
- */
-public interface Aggregator<InputT, OutputT> {
-
- /**
- * Adds a new value into the Aggregator.
- */
- void addValue(InputT value);
-
- /**
- * Returns the name of the Aggregator.
- */
- String getName();
-
- /**
- * Returns the {@link CombineFn}, which combines input elements in the
- * aggregator.
- */
- CombineFn<InputT, ?, OutputT> getCombineFn();
-
- // TODO: Consider the following additional API conveniences:
- // - In addition to createAggregator(), consider adding getAggregator() to
- // avoid the need to store the aggregator locally in a DoFn, i.e., create
- // if not already present.
- // - Add a shortcut for the most common aggregator:
- // c.createAggregator("name", new Sum.SumIntegerFn()).
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
deleted file mode 100644
index 4bbea85..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AggregatorRetriever.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import java.util.Collection;
-
-/**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
- */
-public final class AggregatorRetriever {
- private AggregatorRetriever() {
- // do not instantiate
- }
-
- /**
- * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
- */
- public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
- return fn.getAggregators();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
deleted file mode 100644
index 7b3d87d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/AppliedPTransform.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-
-/**
- * Represents the application of a {@link PTransform} to a specific input to produce
- * a specific output.
- *
- * <p>For internal use.
- *
- * @param <InputT> transform input type
- * @param <OutputT> transform output type
- * @param <TransformT> transform type
- */
-public class AppliedPTransform
- <InputT extends PInput, OutputT extends POutput,
- TransformT extends PTransform<? super InputT, OutputT>> {
-
- private final String fullName;
- private final InputT input;
- private final OutputT output;
- private final TransformT transform;
-
- private AppliedPTransform(String fullName, InputT input, OutputT output, TransformT transform) {
- this.input = input;
- this.output = output;
- this.transform = transform;
- this.fullName = fullName;
- }
-
- public static <InputT extends PInput, OutputT extends POutput,
- TransformT extends PTransform<? super InputT, OutputT>>
- AppliedPTransform<InputT, OutputT, TransformT> of(
- String fullName, InputT input, OutputT output, TransformT transform) {
- return new AppliedPTransform<InputT, OutputT, TransformT>(fullName, input, output, transform);
- }
-
- public String getFullName() {
- return fullName;
- }
-
- public InputT getInput() {
- return input;
- }
-
- public OutputT getOutput() {
- return output;
- }
-
- public TransformT getTransform() {
- return transform;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(getFullName(), getInput(), getOutput(), getTransform());
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof AppliedPTransform) {
- AppliedPTransform<?, ?, ?> that = (AppliedPTransform<?, ?, ?>) other;
- return Objects.equal(this.getFullName(), that.getFullName())
- && Objects.equal(this.getInput(), that.getInput())
- && Objects.equal(this.getOutput(), that.getOutput())
- && Objects.equal(this.getTransform(), that.getTransform());
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("fullName", getFullName())
- .add("input", getInput())
- .add("output", getOutput())
- .add("transform", getTransform())
- .toString();
- }
-}