You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:52 UTC

[28/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
deleted file mode 100644
index 6c9643c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
+++ /dev/null
@@ -1,825 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * An assertion on the contents of a {@link PCollection}
- * incorporated into the pipeline.  Such an assertion
- * can be checked no matter what kind of {@link PipelineRunner} is
- * used.
- *
- * <p>Note that the {@code DataflowAssert} call must precede the call
- * to {@link Pipeline#run}.
- *
- * <p>Examples of use:
- * <pre>{@code
- * Pipeline p = TestPipeline.create();
- * ...
- * PCollection<String> output =
- *      input
- *      .apply(ParDo.of(new TestDoFn()));
- * DataflowAssert.that(output)
- *     .containsInAnyOrder("out1", "out2", "out3");
- * ...
- * PCollection<Integer> ints = ...
- * PCollection<Integer> sum =
- *     ints
- *     .apply(Combine.globally(new SumInts()));
- * DataflowAssert.that(sum)
- *     .is(42);
- * ...
- * p.run();
- * }</pre>
- *
- * <p>JUnit and Hamcrest must be linked in by any code that uses DataflowAssert.
- */
-public class DataflowAssert {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowAssert.class);
-
-  static final String SUCCESS_COUNTER = "DataflowAssertSuccess";
-  static final String FAILURE_COUNTER = "DataflowAssertFailure";
-
-  private static int assertCount = 0;
-
-  // Do not instantiate.
-  private DataflowAssert() {}
-
-  /**
-   * Constructs an {@link IterableAssert} for the elements of the provided
-   * {@link PCollection}.
-   */
-  public static <T> IterableAssert<T> that(PCollection<T> actual) {
-    return new IterableAssert<>(
-        new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
-         actual.getPipeline())
-         .setCoder(actual.getCoder());
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@link PCollection} which must contain a single {@code Iterable<T>}
-   * value.
-   */
-  public static <T> IterableAssert<T>
-      thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
-
-    List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
-    Coder<T> tCoder;
-    try {
-      @SuppressWarnings("unchecked")
-      Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
-      tCoder = tCoderTmp;
-    } catch (NoSuchElementException | IllegalArgumentException exc) {
-      throw new IllegalArgumentException(
-        "DataflowAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
-        + " with a Coder<Iterable<T>> where getCoderArguments() yields a"
-        + " single Coder<T> to apply to the elements.");
-    }
-
-    @SuppressWarnings("unchecked") // Safe covariant cast
-    PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
-
-    return new IterableAssert<>(
-        new CreateActual<Iterable<T>, Iterable<T>>(
-            actualIterables, View.<Iterable<T>>asSingleton()),
-        actual.getPipeline())
-        .setCoder(tCoder);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@code PCollectionView PCollectionView<Iterable<T>>}.
-   */
-  public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
-    return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
-  }
-
-  /**
-   * Constructs a {@link SingletonAssert} for the value of the provided
-   * {@code PCollection PCollection<T>}, which must be a singleton.
-   */
-  public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
-    return new SingletonAssert<>(
-        new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
-        .setCoder(actual.getCoder());
-  }
-
-  /**
-   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
-   *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
-   */
-  public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
-      thatMultimap(PCollection<KV<K, V>> actual) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
-  }
-
-  /**
-   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
-   * which must have at most one value per key.
-   *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
-   */
-  public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
-  }
-
-  ////////////////////////////////////////////////////////////
-
-  /**
-   * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
-   */
-  public static class IterableAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
-    private Optional<Coder<T>> coder;
-
-    protected IterableAssert(
-        PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
-      this.createActual = createActual;
-      this.pipeline = pipeline;
-      this.coder = Optional.absent();
-    }
-
-    /**
-     * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
-    }
-
-    /**
-     * Gets the coder, which may yet be absent.
-     */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert"
-                + " that has not been set yet.");
-      }
-    }
-
-    /**
-     * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
-      pipeline.apply(
-          "DataflowAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
-      return this;
-    }
-
-    /**
-     * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> satisfies(
-        AssertRelation<Iterable<T>, Iterable<T>> relation,
-        final Iterable<T> expectedElements) {
-      pipeline.apply(
-          "DataflowAssert$" + (assertCount++),
-          new TwoSideInputAssert<Iterable<T>, Iterable<T>>(createActual,
-              new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
-              relation));
-
-      return this;
-    }
-
-    /**
-     * Applies a {@link SerializableMatcher} to check the elements of the {@code Iterable}.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
-      // Safe covariant cast. Could be elided by changing a lot of this file to use
-      // more flexible bounds.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      SerializableFunction<Iterable<T>, Void> checkerFn =
-        (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      pipeline.apply(
-          "DataflowAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(
-              createActual,
-              checkerFn));
-      return this;
-    }
-
-    private static class MatcherCheckerFn<T> implements SerializableFunction<T, Void> {
-      private SerializableMatcher<T> matcher;
-
-      public MatcherCheckerFn(SerializableMatcher<T> matcher) {
-        this.matcher = matcher;
-      }
-
-      @Override
-      public Void apply(T actual) {
-        assertThat(actual, matcher);
-        return null;
-      }
-    }
-
-    /**
-     * Checks that the {@code Iterable} is empty.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> empty() {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
-    }
-
-    /**
-     * @throws UnsupportedOperationException always
-     * @deprecated {@link Object#equals(Object)} is not supported on DataflowAssert objects.
-     *    If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
-     *    instead.
-     */
-    @Deprecated
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException(
-          "If you meant to test object equality, use .containsInAnyOrder instead.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException always.
-     * @deprecated {@link Object#hashCode()} is not supported on DataflowAssert objects.
-     */
-    @Deprecated
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException(
-          String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
-    }
-
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
-    }
-
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    @SafeVarargs
-    public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
-      return satisfies(
-        new AssertContainsInAnyOrderRelation<T>(),
-        Arrays.asList(expectedElements));
-    }
-
-    /**
-     * Checks that the {@code Iterable} contains elements that match the provided matchers,
-     * in any order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    @SafeVarargs
-    final IterableAssert<T> containsInAnyOrder(
-        SerializableMatcher<? super T>... elementMatchers) {
-      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
-    }
-  }
-
-  /**
-   * An assertion about the single value of type {@code T}
-   * associated with a {@link PCollectionView}.
-   */
-  public static class SingletonAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final CreateActual<?, T> createActual;
-    private Optional<Coder<T>> coder;
-
-    protected SingletonAssert(
-        CreateActual<?, T> createActual, Pipeline pipeline) {
-      this.pipeline = pipeline;
-      this.createActual = createActual;
-      this.coder = Optional.absent();
-    }
-
-    /**
-     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
-     * {@link #isEqualTo}.
-     */
-    @Deprecated
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "tests for Java equality of the %s object, not the PCollection in question. "
-                  + "Call a test method, such as isEqualTo.",
-              getClass().getSimpleName()));
-    }
-
-    /**
-     * @throws UnsupportedOperationException always.
-     * @deprecated {@link Object#hashCode()} is not supported on DataflowAssert objects.
-     */
-    @Deprecated
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException(
-          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
-    }
-
-    /**
-     * Sets the coder to use for elements of type {@code T}, as needed
-     * for internal purposes.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
-    }
-
-    /**
-     * Gets the coder, which may yet be absent.
-     */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert that has not been set yet.");
-      }
-    }
-
-    /**
-     * Applies a {@link SerializableFunction} to check the value of this
-     * {@code SingletonAssert}'s view.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
-      pipeline.apply(
-          "DataflowAssert$" + (assertCount++),
-          new OneSideInputAssert<T>(createActual, checkerFn));
-      return this;
-    }
-
-    /**
-     * Applies an {@link AssertRelation} to check the provided relation against the
-     * value of this assert and the provided expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(
-        AssertRelation<T, T> relation,
-        final T expectedValue) {
-      pipeline.apply(
-          "DataflowAssert$" + (assertCount++),
-          new TwoSideInputAssert<T, T>(createActual,
-              new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
-              relation));
-
-      return this;
-    }
-
-    /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal
-     * to the expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> isEqualTo(T expectedValue) {
-      return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
-    }
-
-    /**
-     * Checks that the value of this {@code SingletonAssert}'s view is not equal
-     * to the expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> notEqualTo(T expectedValue) {
-      return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
-    }
-
-    /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal to
-     * the expected value.
-     *
-     * @deprecated replaced by {@link #isEqualTo}
-     */
-    @Deprecated
-    public SingletonAssert<T> is(T expectedValue) {
-      return isEqualTo(expectedValue);
-    }
-
-  }
-
-  ////////////////////////////////////////////////////////////////////////
-
-  private static class CreateActual<T, ActualT>
-      extends PTransform<PBegin, PCollectionView<ActualT>> {
-
-    private final transient PCollection<T> actual;
-    private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
-
-    private CreateActual(PCollection<T> actual,
-        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
-      this.actual = actual;
-      this.actualView = actualView;
-    }
-
-    @Override
-    public PCollectionView<ActualT> apply(PBegin input) {
-      final Coder<T> coder = actual.getCoder();
-      return actual
-          .apply(Window.<T>into(new GlobalWindows()))
-          .apply(ParDo.of(new DoFn<T, T>() {
-            @Override
-            public void processElement(ProcessContext context) throws CoderException {
-              context.output(CoderUtils.clone(coder, context.element()));
-            }
-          }))
-          .apply(actualView);
-    }
-  }
-
-  private static class CreateExpected<T, ExpectedT>
-      extends PTransform<PBegin, PCollectionView<ExpectedT>> {
-
-    private final Iterable<T> elements;
-    private final Optional<Coder<T>> coder;
-    private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
-
-    private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
-        PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
-      this.elements = elements;
-      this.coder = coder;
-      this.view = view;
-    }
-
-    @Override
-    public PCollectionView<ExpectedT> apply(PBegin input) {
-      Create.Values<T> createTransform = Create.<T>of(elements);
-      if (coder.isPresent()) {
-        createTransform = createTransform.withCoder(coder.get());
-      }
-      return input.apply(createTransform).apply(view);
-    }
-  }
-
-  private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
-
-    private final PCollectionView<T> view;
-
-    private PreExisting(PCollectionView<T> view) {
-      this.view = view;
-    }
-
-    @Override
-    public PCollectionView<T> apply(PBegin input) {
-      return view;
-    }
-  }
-
-  /**
-   * An assertion checker that takes a single
-   * {@link PCollectionView PCollectionView&lt;ActualT&gt;}
-   * and an assertion over {@code ActualT}, and checks it within a dataflow
-   * pipeline.
-   *
-   * <p>Note that the entire assertion must be serializable. If
-   * you need to make assertions involving multiple inputs
-   * that are each not serializable, use TwoSideInputAssert.
-   *
-   * <p>This is generally useful for assertion functions that
-   * are serializable but whose underlying data may not have a coder.
-   */
-  static class OneSideInputAssert<ActualT>
-      extends PTransform<PBegin, PDone> implements Serializable {
-    private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
-    private final SerializableFunction<ActualT, Void> checkerFn;
-
-    public OneSideInputAssert(
-        PTransform<PBegin, PCollectionView<ActualT>> createActual,
-        SerializableFunction<ActualT, Void> checkerFn) {
-      this.createActual = createActual;
-      this.checkerFn = checkerFn;
-    }
-
-    @Override
-    public PDone apply(PBegin input) {
-      final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
-
-      input
-          .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.named("RunChecks").withSideInputs(actual)
-              .of(new CheckerDoFn<>(checkerFn, actual)));
-
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
-   * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
-   */
-  private static class CheckerDoFn<ActualT> extends DoFn<Void, Void> {
-    private final SerializableFunction<ActualT, Void> checkerFn;
-    private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
-    private final PCollectionView<ActualT> actual;
-
-    private CheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn,
-        PCollectionView<ActualT> actual) {
-      this.checkerFn = checkerFn;
-      this.actual = actual;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      try {
-        ActualT actualContents = c.sideInput(actual);
-        checkerFn.apply(actualContents);
-        success.addValue(1);
-      } catch (Throwable t) {
-        LOG.error("DataflowAssert failed expectations.", t);
-        failure.addValue(1);
-        // TODO: allow for metrics to propagate on failure when running a streaming pipeline
-        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-          throw t;
-        }
-      }
-    }
-  }
-
-  /**
-   * An assertion checker that takes a {@link PCollectionView PCollectionView&lt;ActualT&gt;},
-   * a {@link PCollectionView PCollectionView&lt;ExpectedT&gt;}, a relation
-   * over {@code A} and {@code B}, and checks that the relation holds
-   * within a dataflow pipeline.
-   *
-   * <p>This is useful when either/both of {@code A} and {@code B}
-   * are not serializable, but have coders (provided
-   * by the underlying {@link PCollection}s).
-   */
-  static class TwoSideInputAssert<ActualT, ExpectedT>
-      extends PTransform<PBegin, PDone> implements Serializable {
-
-    private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
-    private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
-    private final AssertRelation<ActualT, ExpectedT> relation;
-
-    protected TwoSideInputAssert(
-        PTransform<PBegin, PCollectionView<ActualT>> createActual,
-        PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
-        AssertRelation<ActualT, ExpectedT> relation) {
-      this.createActual = createActual;
-      this.createExpected = createExpected;
-      this.relation = relation;
-    }
-
-    @Override
-    public PDone apply(PBegin input) {
-      final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
-      final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
-
-      input
-          .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.named("RunChecks").withSideInputs(actual, expected)
-              .of(new CheckerDoFn<>(relation, actual, expected)));
-
-      return PDone.in(input.getPipeline());
-    }
-
-    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Void, Void> {
-      private final Aggregator<Integer, Integer> success =
-          createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-      private final Aggregator<Integer, Integer> failure =
-          createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
-      private final AssertRelation<ActualT, ExpectedT> relation;
-      private final PCollectionView<ActualT> actual;
-      private final PCollectionView<ExpectedT> expected;
-
-      private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
-          PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
-        this.relation = relation;
-        this.actual = actual;
-        this.expected = expected;
-      }
-
-      @Override
-      public void processElement(ProcessContext c) {
-        try {
-          ActualT actualContents = c.sideInput(actual);
-          ExpectedT expectedContents = c.sideInput(expected);
-          relation.assertFor(expectedContents).apply(actualContents);
-          success.addValue(1);
-        } catch (Throwable t) {
-          LOG.error("DataflowAssert failed expectations.", t);
-          failure.addValue(1);
-          // TODO: allow for metrics to propagate on failure when running a streaming pipeline
-          if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-            throw t;
-          }
-        }
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@link SerializableFunction} that verifies that an actual value is equal to an
-   * expected value.
-   */
-  private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
-    private T expected;
-
-    public AssertIsEqualTo(T expected) {
-      this.expected = expected;
-    }
-
-    @Override
-    public Void apply(T actual) {
-      assertThat(actual, equalTo(expected));
-      return null;
-    }
-  }
-
-  /**
-   * A {@link SerializableFunction} that verifies that an actual value is not equal to an
-   * expected value.
-   */
-  private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
-    private T expected;
-
-    public AssertNotEqualTo(T expected) {
-      this.expected = expected;
-    }
-
-    @Override
-    public Void apply(T actual) {
-      assertThat(actual, not(equalTo(expected)));
-      return null;
-    }
-  }
-
-  /**
-   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
-   * expected items in any order.
-   */
-  private static class AssertContainsInAnyOrder<T>
-      implements SerializableFunction<Iterable<T>, Void> {
-    private T[] expected;
-
-    @SafeVarargs
-    public AssertContainsInAnyOrder(T... expected) {
-      this.expected = expected;
-    }
-
-    @SuppressWarnings("unchecked")
-    public AssertContainsInAnyOrder(Collection<T> expected) {
-      this((T[]) expected.toArray());
-    }
-
-    public AssertContainsInAnyOrder(Iterable<T> expected) {
-      this(Lists.<T>newArrayList(expected));
-    }
-
-    @Override
-    public Void apply(Iterable<T> actual) {
-      assertThat(actual, containsInAnyOrder(expected));
-      return null;
-    }
-  }
-
-  ////////////////////////////////////////////////////////////
-
-  /**
-   * A binary predicate between types {@code Actual} and {@code Expected}.
-   * Implemented as a method {@code assertFor(Expected)} which returns
-   * a {@code SerializableFunction<Actual, Void>}
-   * that should verify the assertion..
-   */
-  private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
-    public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
-  }
-
-  /**
-   * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
-   */
-  private static class AssertIsEqualToRelation<T>
-      implements AssertRelation<T, T> {
-    @Override
-    public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertIsEqualTo<T>(expected);
-    }
-  }
-
-  /**
-   * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
-   */
-  private static class AssertNotEqualToRelation<T>
-      implements AssertRelation<T, T> {
-    @Override
-    public SerializableFunction<T, Void> assertFor(T expected) {
-      return new AssertNotEqualTo<T>(expected);
-    }
-  }
-
-  /**
-   * An {@code AssertRelation} implementing the binary predicate that two collections are equal
-   * modulo reordering.
-   */
-  private static class AssertContainsInAnyOrderRelation<T>
-      implements AssertRelation<Iterable<T>, Iterable<T>> {
-    @Override
-    public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> expectedElements) {
-      return new AssertContainsInAnyOrder<T>(expectedElements);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
deleted file mode 100644
index 60ab2e5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-/**
- * Category tag for tests that can be run on the
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner} if the
- * {@code runIntegrationTestOnService} System property is set to true.
- * Example usage:
- * <pre><code>
- *     {@literal @}Test
- *     {@literal @}Category(RunnableOnService.class)
- *     public void testParDo() {...
- * </code></pre>
- */
-public interface RunnableOnService {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
deleted file mode 100644
index 10f221e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import org.hamcrest.Matcher;
-
-import java.io.Serializable;
-
-/**
- * A {@link Matcher} that is also {@link Serializable}.
- *
- * <p>Such matchers can be used with {@link DataflowAssert}, which builds Dataflow pipelines
- * such that these matchers may be serialized and executed remotely.
- *
- * <p>To create a {@code SerializableMatcher}, extend {@link org.hamcrest.BaseMatcher}
- * and also implement this interface.
- *
- * @param <T> The type of value matched.
- */
-interface SerializableMatcher<T> extends Matcher<T>, Serializable {
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
deleted file mode 100644
index da5171e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
+++ /dev/null
@@ -1,1180 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * Static class for building and using {@link SerializableMatcher} instances.
- *
- * <p>Most matchers are wrappers for hamcrest's {@link Matchers}. Please be familiar with the
- * documentation there. Values retained by a {@link SerializableMatcher} are required to be
- * serializable, either via Java serialization or via a provided {@link Coder}.
- *
- * <p>The following matchers are novel to Dataflow:
- * <ul>
- * <li>{@link #kvWithKey} for matching just the key of a {@link KV}.
- * <li>{@link #kvWithValue} for matching just the value of a {@link KV}.
- * <li>{@link #kv} for matching the key and value of a {@link KV}.
- * </ul>
- *
- * <p>For example, to match a group from
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}, which has type
- * {@code KV<K, Iterable<V>>} for some {@code K} and {@code V} and where the order of the iterable
- * is undefined, use a matcher like
- * {@code kv(equalTo("some key"), containsInAnyOrder(1, 2, 3))}.
- */
-class SerializableMatchers implements Serializable {
-
-  // Serializable only because of capture by anonymous inner classes
-  private SerializableMatchers() { } // not instantiable
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Iterable)}.
-   */
-  public static <T> SerializableMatcher<T>
-  allOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.allOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> allOf(final SerializableMatcher<T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.allOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Iterable)}.
-   */
-  public static <T> SerializableMatcher<T>
-  anyOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.anyOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> anyOf(final SerializableMatcher<T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.anyOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anything()}.
-   */
-  public static SerializableMatcher<Object> anything() {
-    return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
-      @Override
-      public Matcher<Object> get() {
-        return Matchers.anything();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T[]>
-  arrayContaining(final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContaining(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]>
-  arrayContaining(final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.<T>arrayContaining(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(List)}.
-   */
-  public static <T> SerializableMatcher<T[]>
-  arrayContaining(List<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T[]>
-  arrayContainingInAnyOrder(final T... items) {
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContainingInAnyOrder(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.<T>arrayContainingInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Collection)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
-      Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContainingInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayWithSize(int)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayWithSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayWithSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayWithSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayWithSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayWithSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#closeTo(double,double)}.
-   */
-  public static SerializableMatcher<Double> closeTo(final double target, final double error) {
-    return fromSupplier(new SerializableSupplier<Matcher<Double>>() {
-      @Override
-      public Matcher<Double> get() {
-        return Matchers.closeTo(target, error);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
-      final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.contains(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>>
-  contains(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>> contains(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.<T>contains(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(List)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
-      List<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.contains(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>>
-  containsInAnyOrder(final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>>
-  containsInAnyOrder(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.<T>containsInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Collection)}.
-   */
-  public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
-      Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#containsString}.
-   */
-  public static SerializableMatcher<String> containsString(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.containsString(substring);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#empty()}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> empty() {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.empty();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyArray()}.
-   */
-  public static <T> SerializableMatcher<T[]> emptyArray() {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.emptyArray();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyIterable()}.
-   */
-  public static <T> SerializableMatcher<Iterable<? extends T>> emptyIterable() {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.emptyIterable();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#endsWith}.
-   */
-  public static SerializableMatcher<String> endsWith(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.endsWith(substring);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T> equalTo(final T expected) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.equalTo(expected);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
-   *
-   * <p>The expected value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> equalTo(Coder<T> coder, T expected) {
-
-    final SerializableSupplier<T> expectedSupplier = new SerializableViaCoder<>(coder, expected);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.equalTo(expectedSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThan(final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThan(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThan(final Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThan(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#greaterThanOrEqualTo()}.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T> greaterThanOrEqualTo(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThanOrEqualTo(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#greaterThanOrEqualTo()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThanOrEqualTo(final Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThanOrEqualTo(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<Iterable<? super T>> hasItem(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
-   *
-   * <p>The item of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<Iterable<? super T>> hasItem(Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Iterable<? super T>> hasItem(
-      final SerializableMatcher<? super T> matcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(matcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(int)}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> hasSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.hasSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> hasSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.hasSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#iterableWithSize(int)}.
-   */
-  public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
-      @Override
-      public Matcher<Iterable<T>> get() {
-        return Matchers.iterableWithSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#iterableWithSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
-      @Override
-      public Matcher<Iterable<T>> get() {
-        return Matchers.iterableWithSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T>
-  isIn(final Collection<T> collection) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(collection);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> isIn(Coder<T> coder, Collection<T> collection) {
-    @SuppressWarnings("unchecked")
-    T[] items = (T[]) collection.toArray();
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T> isIn(final T[] items) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> isIn(Coder<T> coder, T[] items) {
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T> isOneOf(final T... elems) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isOneOf(elems);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> isOneOf(Coder<T> coder, T... items) {
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isOneOf(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
-   */
-  public static <K extends Serializable, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithKey(K key) {
-    return new KvKeyMatcher<K, V>(equalTo(key));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
-   *
-   * <p>The key of type {@code K} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithKey(Coder<K> coder, K key) {
-    return new KvKeyMatcher<K, V>(equalTo(coder, key));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching key.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithKey(
-      final SerializableMatcher<? super K> keyMatcher) {
-    return new KvKeyMatcher<K, V>(keyMatcher);
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
-   */
-  public static <K, V extends Serializable> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithValue(V value) {
-    return new KvValueMatcher<K, V>(equalTo(value));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
-   *
-   * <p>The value of type {@code V} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithValue(Coder<V> coder, V value) {
-    return new KvValueMatcher<K, V>(equalTo(coder, value));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching value.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithValue(
-      final SerializableMatcher<? super V> valueMatcher) {
-    return new KvValueMatcher<>(valueMatcher);
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching key and value.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kv(
-      final SerializableMatcher<? super K> keyMatcher,
-      final SerializableMatcher<? super V> valueMatcher) {
-
-    return SerializableMatchers.<KV<? extends K, ? extends V>>allOf(
-        SerializableMatchers.<K, V>kvWithKey(keyMatcher),
-        SerializableMatchers.<K, V>kvWithValue(valueMatcher));
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThan(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThan(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T>
-  lessThan(Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThan(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#lessThanOrEqualTo()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThanOrEqualTo(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThanOrEqualTo(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#lessThanOrEqualTo()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T> lessThanOrEqualTo(
-      Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThanOrEqualTo(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#not}.
-   */
-  public static <T> SerializableMatcher<T> not(final SerializableMatcher<T> matcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.not(matcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#nullValue}.
-   */
-  public static SerializableMatcher<Object> nullValue() {
-    return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
-      @Override
-      public Matcher<Object> get() {
-        return Matchers.nullValue();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#startsWith}.
-   */
-  public static SerializableMatcher<String> startsWith(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.startsWith(substring);
-      }
-    });
-  }
-
-  private static class KvKeyMatcher<K, V>
-  extends BaseMatcher<KV<? extends K, ? extends V>>
-  implements SerializableMatcher<KV<? extends K, ? extends V>> {
-    private final SerializableMatcher<? super K> keyMatcher;
-
-    public KvKeyMatcher(SerializableMatcher<? super K> keyMatcher) {
-      this.keyMatcher = keyMatcher;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      @SuppressWarnings("unchecked")
-      KV<K, ?> kvItem = (KV<K, ?>) item;
-      return keyMatcher.matches(kvItem.getKey());
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      @SuppressWarnings("unchecked")
-      KV<K, ?> kvItem = (KV<K, ?>) item;
-      if (!keyMatcher.matches(kvItem.getKey())) {
-        mismatchDescription.appendText("key did not match: ");
-        keyMatcher.describeMismatch(kvItem.getKey(), mismatchDescription);
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("KV with key matching ");
-      keyMatcher.describeTo(description);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .addValue(keyMatcher)
-          .toString();
-    }
-  }
-
-  private static class KvValueMatcher<K, V>
-  extends BaseMatcher<KV<? extends K, ? extends V>>
-  implements SerializableMatcher<KV<? extends K, ? extends V>> {
-    private final SerializableMatcher<? super V> valueMatcher;
-
-    public KvValueMatcher(SerializableMatcher<? super V> valueMatcher) {
-      this.valueMatcher = valueMatcher;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      @SuppressWarnings("unchecked")
-      KV<?, V> kvItem = (KV<?, V>) item;
-      return valueMatcher.matches(kvItem.getValue());
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      @SuppressWarnings("unchecked")
-      KV<?, V> kvItem = (KV<?, V>) item;
-      if (!valueMatcher.matches(kvItem.getValue())) {
-        mismatchDescription.appendText("value did not match: ");
-        valueMatcher.describeMismatch(kvItem.getValue(), mismatchDescription);
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("KV with value matching ");
-      valueMatcher.describeTo(description);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .addValue(valueMatcher)
-          .toString();
-    }
-  }
-
-  /**
-   * Constructs a {@link SerializableMatcher} from a non-serializable {@link Matcher} via
-   * indirection through {@link SerializableSupplier}.
-   *
-   * <p>To wrap a {@link Matcher} which is not serializable, provide a {@link SerializableSupplier}
-   * with a {@link SerializableSupplier#get()} method that returns a fresh instance of the
-   * {@link Matcher} desired. The resulting {@link SerializableMatcher} will behave according to
-   * the {@link Matcher} returned by {@link SerializableSupplier#get() get()} when it is invoked
-   * during matching (which may occur on another machine, such as a Dataflow worker).
-   *
-   * <code>
-   * return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-   *   *     @Override
-   *     public Matcher<T> get() {
-   *       return new MyMatcherForT();
-   *     }
-   * });
-   * </code>
-   */
-  public static <T> SerializableMatcher<T> fromSupplier(
-      SerializableSupplier<Matcher<T>> supplier) {
-    return new SerializableMatcherFromSupplier<>(supplier);
-  }
-
-  /**
-   * Supplies values of type {@code T}, and is serializable. Thus, even if {@code T} is not
-   * serializable, the supplier can be serialized and provide a {@code T} wherever it is
-   * deserialized.
-   *
-   * @param <T> the type of value supplied.
-   */
-  public interface SerializableSupplier<T> extends Serializable {
-    T get();
-  }
-
-  /**
-   * Since the delegate {@link Matcher} is not generally serializable, instead this takes a nullary
-   * SerializableFunction to return such a matcher.
-   */
-  private static class SerializableMatcherFromSupplier<T> extends BaseMatcher<T>
-  implements SerializableMatcher<T> {
-
-    private SerializableSupplier<Matcher<T>> supplier;
-
-    public SerializableMatcherFromSupplier(SerializableSupplier<Matcher<T>> supplier) {
-      this.supplier = supplier;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      supplier.get().describeTo(description);
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      return supplier.get().matches(item);
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      supplier.get().describeMismatch(item, mismatchDescription);
-    }
-  }
-
-  /**
-   * Wraps any value that can be encoded via a {@link Coder} to make it {@link Serializable}.
-   * This is not likely to be a good encoding, so should be used only for tests, where data
-   * volume is small and minor costs are not critical.
-   */
-  private static class SerializableViaCoder<T> implements SerializableSupplier<T> {
-    /** Cached value that is not serialized. */
-    @Nullable
-    private transient T value;
-
-    /** The bytes of {@link #value} when encoded via {@link #coder}. */
-    private byte[] encodedValue;
-
-    private Coder<T> coder;
-
-    public SerializableViaCoder(Coder<T> coder, T value) {
-      this.coder = coder;
-      this.value = value;
-      try {
-        this.encodedValue = CoderUtils.encodeToByteArray(coder, value);
-      } catch (CoderException exc) {
-        throw new RuntimeException("Error serializing via Coder", exc);
-      }
-    }
-
-    @Override
-    public T get() {
-      if (value == null) {
-        try {
-          value = CoderUtils.decodeFromByteArray(coder, encodedValue);
-        } catch (CoderException exc) {
-          throw new RuntimeException("Error deserializing via Coder", exc);
-        }
-      }
-      return value;
-    }
-  }
-
-  /**
-   * Wraps any array with values that can be encoded via a {@link Coder} to make it
-   * {@link Serializable}. This is not likely to be a good encoding, so should be used only for
-   * tests, where data volume is small and minor costs are not critical.
-   */
-  private static class SerializableArrayViaCoder<T> implements SerializableSupplier<T[]> {
-    /** Cached value that is not serialized. */
-    @Nullable
-    private transient T[] value;
-
-    /** The bytes of {@link #value} when encoded via {@link #coder}. */
-    private byte[] encodedValue;
-
-    private Coder<List<T>> coder;
-
-    public SerializableArrayViaCoder(Coder<T> elementCoder, T[] value) {
-      this.coder = ListCoder.of(elementCoder);
-      this.value = value;
-      try {
-        this.encodedValue = CoderUtils.encodeToByteArray(coder, Arrays.asList(value));
-      } catch (CoderException exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public T[] get() {
-      if (value == null) {
-        try {
-          @SuppressWarnings("unchecked")
-          T[] decoded = (T[]) CoderUtils.decodeFromByteArray(coder, encodedValue).toArray();
-          value = decoded;
-        } catch (CoderException exc) {
-          throw new RuntimeException("Error deserializing via Coder", exc);
-        }
-      }
-      return value;
-    }
-  }
-}