You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:52 UTC
[28/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
deleted file mode 100644
index 6c9643c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/DataflowAssert.java
+++ /dev/null
@@ -1,825 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * An assertion on the contents of a {@link PCollection}
- * incorporated into the pipeline. Such an assertion
- * can be checked no matter what kind of {@link PipelineRunner} is
- * used.
- *
- * <p>Note that the {@code DataflowAssert} call must precede the call
- * to {@link Pipeline#run}.
- *
- * <p>Examples of use:
- * <pre>{@code
- * Pipeline p = TestPipeline.create();
- * ...
- * PCollection<String> output =
- * input
- * .apply(ParDo.of(new TestDoFn()));
- * DataflowAssert.that(output)
- * .containsInAnyOrder("out1", "out2", "out3");
- * ...
- * PCollection<Integer> ints = ...
- * PCollection<Integer> sum =
- * ints
- * .apply(Combine.globally(new SumInts()));
- * DataflowAssert.that(sum)
- * .is(42);
- * ...
- * p.run();
- * }</pre>
- *
- * <p>JUnit and Hamcrest must be linked in by any code that uses DataflowAssert.
- */
-public class DataflowAssert {
-
- private static final Logger LOG = LoggerFactory.getLogger(DataflowAssert.class);
-
- static final String SUCCESS_COUNTER = "DataflowAssertSuccess";
- static final String FAILURE_COUNTER = "DataflowAssertFailure";
-
- private static int assertCount = 0;
-
- // Do not instantiate.
- private DataflowAssert() {}
-
- /**
- * Constructs an {@link IterableAssert} for the elements of the provided
- * {@link PCollection}.
- */
- public static <T> IterableAssert<T> that(PCollection<T> actual) {
- return new IterableAssert<>(
- new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
- actual.getPipeline())
- .setCoder(actual.getCoder());
- }
-
- /**
- * Constructs an {@link IterableAssert} for the value of the provided
- * {@link PCollection} which must contain a single {@code Iterable<T>}
- * value.
- */
- public static <T> IterableAssert<T>
- thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
-
- List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
- Coder<T> tCoder;
- try {
- @SuppressWarnings("unchecked")
- Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
- tCoder = tCoderTmp;
- } catch (NoSuchElementException | IllegalArgumentException exc) {
- throw new IllegalArgumentException(
- "DataflowAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
- + " with a Coder<Iterable<T>> where getCoderArguments() yields a"
- + " single Coder<T> to apply to the elements.");
- }
-
- @SuppressWarnings("unchecked") // Safe covariant cast
- PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
-
- return new IterableAssert<>(
- new CreateActual<Iterable<T>, Iterable<T>>(
- actualIterables, View.<Iterable<T>>asSingleton()),
- actual.getPipeline())
- .setCoder(tCoder);
- }
-
- /**
- * Constructs an {@link IterableAssert} for the value of the provided
- * {@code PCollectionView PCollectionView<Iterable<T>>}.
- */
- public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
- return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
- }
-
- /**
- * Constructs a {@link SingletonAssert} for the value of the provided
- * {@code PCollection PCollection<T>}, which must be a singleton.
- */
- public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
- return new SingletonAssert<>(
- new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
- .setCoder(actual.getCoder());
- }
-
- /**
- * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
- *
- * <p>Note that the actual value must be coded by a {@link KvCoder},
- * not just any {@code Coder<K, V>}.
- */
- public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
- thatMultimap(PCollection<KV<K, V>> actual) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
- return new SingletonAssert<>(
- new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
- .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
- }
-
- /**
- * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
- * which must have at most one value per key.
- *
- * <p>Note that the actual value must be coded by a {@link KvCoder},
- * not just any {@code Coder<K, V>}.
- */
- public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
- return new SingletonAssert<>(
- new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
- .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
- }
-
- ////////////////////////////////////////////////////////////
-
- /**
- * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
- */
- public static class IterableAssert<T> implements Serializable {
- private final Pipeline pipeline;
- private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
- private Optional<Coder<T>> coder;
-
- protected IterableAssert(
- PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
- this.createActual = createActual;
- this.pipeline = pipeline;
- this.coder = Optional.absent();
- }
-
- /**
- * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
- this.coder = Optional.fromNullable(coderOrNull);
- return this;
- }
-
- /**
- * Gets the coder, which may yet be absent.
- */
- public Coder<T> getCoder() {
- if (coder.isPresent()) {
- return coder.get();
- } else {
- throw new IllegalStateException(
- "Attempting to access the coder of an IterableAssert"
- + " that has not been set yet.");
- }
- }
-
- /**
- * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
- pipeline.apply(
- "DataflowAssert$" + (assertCount++),
- new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
- return this;
- }
-
- /**
- * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public IterableAssert<T> satisfies(
- AssertRelation<Iterable<T>, Iterable<T>> relation,
- final Iterable<T> expectedElements) {
- pipeline.apply(
- "DataflowAssert$" + (assertCount++),
- new TwoSideInputAssert<Iterable<T>, Iterable<T>>(createActual,
- new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
- relation));
-
- return this;
- }
-
- /**
- * Applies a {@link SerializableMatcher} to check the elements of the {@code Iterable}.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
- // Safe covariant cast. Could be elided by changing a lot of this file to use
- // more flexible bounds.
- @SuppressWarnings({"rawtypes", "unchecked"})
- SerializableFunction<Iterable<T>, Void> checkerFn =
- (SerializableFunction) new MatcherCheckerFn<>(matcher);
- pipeline.apply(
- "DataflowAssert$" + (assertCount++),
- new OneSideInputAssert<Iterable<T>>(
- createActual,
- checkerFn));
- return this;
- }
-
- private static class MatcherCheckerFn<T> implements SerializableFunction<T, Void> {
- private SerializableMatcher<T> matcher;
-
- public MatcherCheckerFn(SerializableMatcher<T> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public Void apply(T actual) {
- assertThat(actual, matcher);
- return null;
- }
- }
-
- /**
- * Checks that the {@code Iterable} is empty.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public IterableAssert<T> empty() {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
- }
-
- /**
- * @throws UnsupportedOperationException always
- * @deprecated {@link Object#equals(Object)} is not supported on DataflowAssert objects.
- * If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
- * instead.
- */
- @Deprecated
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- "If you meant to test object equality, use .containsInAnyOrder instead.");
- }
-
- /**
- * @throws UnsupportedOperationException always.
- * @deprecated {@link Object#hashCode()} is not supported on DataflowAssert objects.
- */
- @Deprecated
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
- }
-
- /**
- * Checks that the {@code Iterable} contains the expected elements, in any
- * order.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
- }
-
- /**
- * Checks that the {@code Iterable} contains the expected elements, in any
- * order.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- @SafeVarargs
- public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
- return satisfies(
- new AssertContainsInAnyOrderRelation<T>(),
- Arrays.asList(expectedElements));
- }
-
- /**
- * Checks that the {@code Iterable} contains elements that match the provided matchers,
- * in any order.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- @SafeVarargs
- final IterableAssert<T> containsInAnyOrder(
- SerializableMatcher<? super T>... elementMatchers) {
- return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
- }
- }
-
- /**
- * An assertion about the single value of type {@code T}
- * associated with a {@link PCollectionView}.
- */
- public static class SingletonAssert<T> implements Serializable {
- private final Pipeline pipeline;
- private final CreateActual<?, T> createActual;
- private Optional<Coder<T>> coder;
-
- protected SingletonAssert(
- CreateActual<?, T> createActual, Pipeline pipeline) {
- this.pipeline = pipeline;
- this.createActual = createActual;
- this.coder = Optional.absent();
- }
-
- /**
- * Always throws an {@link UnsupportedOperationException}: users are probably looking for
- * {@link #isEqualTo}.
- */
- @Deprecated
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- String.format(
- "tests for Java equality of the %s object, not the PCollection in question. "
- + "Call a test method, such as isEqualTo.",
- getClass().getSimpleName()));
- }
-
- /**
- * @throws UnsupportedOperationException always.
- * @deprecated {@link Object#hashCode()} is not supported on DataflowAssert objects.
- */
- @Deprecated
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
- }
-
- /**
- * Sets the coder to use for elements of type {@code T}, as needed
- * for internal purposes.
- *
- * <p>Returns this {@code IterableAssert}.
- */
- public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
- this.coder = Optional.fromNullable(coderOrNull);
- return this;
- }
-
- /**
- * Gets the coder, which may yet be absent.
- */
- public Coder<T> getCoder() {
- if (coder.isPresent()) {
- return coder.get();
- } else {
- throw new IllegalStateException(
- "Attempting to access the coder of an IterableAssert that has not been set yet.");
- }
- }
-
- /**
- * Applies a {@link SerializableFunction} to check the value of this
- * {@code SingletonAssert}'s view.
- *
- * <p>Returns this {@code SingletonAssert}.
- */
- public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
- pipeline.apply(
- "DataflowAssert$" + (assertCount++),
- new OneSideInputAssert<T>(createActual, checkerFn));
- return this;
- }
-
- /**
- * Applies an {@link AssertRelation} to check the provided relation against the
- * value of this assert and the provided expected value.
- *
- * <p>Returns this {@code SingletonAssert}.
- */
- public SingletonAssert<T> satisfies(
- AssertRelation<T, T> relation,
- final T expectedValue) {
- pipeline.apply(
- "DataflowAssert$" + (assertCount++),
- new TwoSideInputAssert<T, T>(createActual,
- new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
- relation));
-
- return this;
- }
-
- /**
- * Checks that the value of this {@code SingletonAssert}'s view is equal
- * to the expected value.
- *
- * <p>Returns this {@code SingletonAssert}.
- */
- public SingletonAssert<T> isEqualTo(T expectedValue) {
- return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
- }
-
- /**
- * Checks that the value of this {@code SingletonAssert}'s view is not equal
- * to the expected value.
- *
- * <p>Returns this {@code SingletonAssert}.
- */
- public SingletonAssert<T> notEqualTo(T expectedValue) {
- return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
- }
-
- /**
- * Checks that the value of this {@code SingletonAssert}'s view is equal to
- * the expected value.
- *
- * @deprecated replaced by {@link #isEqualTo}
- */
- @Deprecated
- public SingletonAssert<T> is(T expectedValue) {
- return isEqualTo(expectedValue);
- }
-
- }
-
- ////////////////////////////////////////////////////////////////////////
-
- private static class CreateActual<T, ActualT>
- extends PTransform<PBegin, PCollectionView<ActualT>> {
-
- private final transient PCollection<T> actual;
- private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
-
- private CreateActual(PCollection<T> actual,
- PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
- this.actual = actual;
- this.actualView = actualView;
- }
-
- @Override
- public PCollectionView<ActualT> apply(PBegin input) {
- final Coder<T> coder = actual.getCoder();
- return actual
- .apply(Window.<T>into(new GlobalWindows()))
- .apply(ParDo.of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext context) throws CoderException {
- context.output(CoderUtils.clone(coder, context.element()));
- }
- }))
- .apply(actualView);
- }
- }
-
- private static class CreateExpected<T, ExpectedT>
- extends PTransform<PBegin, PCollectionView<ExpectedT>> {
-
- private final Iterable<T> elements;
- private final Optional<Coder<T>> coder;
- private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
-
- private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
- PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
- this.elements = elements;
- this.coder = coder;
- this.view = view;
- }
-
- @Override
- public PCollectionView<ExpectedT> apply(PBegin input) {
- Create.Values<T> createTransform = Create.<T>of(elements);
- if (coder.isPresent()) {
- createTransform = createTransform.withCoder(coder.get());
- }
- return input.apply(createTransform).apply(view);
- }
- }
-
- private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
-
- private final PCollectionView<T> view;
-
- private PreExisting(PCollectionView<T> view) {
- this.view = view;
- }
-
- @Override
- public PCollectionView<T> apply(PBegin input) {
- return view;
- }
- }
-
- /**
- * An assertion checker that takes a single
- * {@link PCollectionView PCollectionView<ActualT>}
- * and an assertion over {@code ActualT}, and checks it within a dataflow
- * pipeline.
- *
- * <p>Note that the entire assertion must be serializable. If
- * you need to make assertions involving multiple inputs
- * that are each not serializable, use TwoSideInputAssert.
- *
- * <p>This is generally useful for assertion functions that
- * are serializable but whose underlying data may not have a coder.
- */
- static class OneSideInputAssert<ActualT>
- extends PTransform<PBegin, PDone> implements Serializable {
- private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
- private final SerializableFunction<ActualT, Void> checkerFn;
-
- public OneSideInputAssert(
- PTransform<PBegin, PCollectionView<ActualT>> createActual,
- SerializableFunction<ActualT, Void> checkerFn) {
- this.createActual = createActual;
- this.checkerFn = checkerFn;
- }
-
- @Override
- public PDone apply(PBegin input) {
- final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
-
- input
- .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.named("RunChecks").withSideInputs(actual)
- .of(new CheckerDoFn<>(checkerFn, actual)));
-
- return PDone.in(input.getPipeline());
- }
- }
-
- /**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
- * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
- */
- private static class CheckerDoFn<ActualT> extends DoFn<Void, Void> {
- private final SerializableFunction<ActualT, Void> checkerFn;
- private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
- private final PCollectionView<ActualT> actual;
-
- private CheckerDoFn(
- SerializableFunction<ActualT, Void> checkerFn,
- PCollectionView<ActualT> actual) {
- this.checkerFn = checkerFn;
- this.actual = actual;
- }
-
- @Override
- public void processElement(ProcessContext c) {
- try {
- ActualT actualContents = c.sideInput(actual);
- checkerFn.apply(actualContents);
- success.addValue(1);
- } catch (Throwable t) {
- LOG.error("DataflowAssert failed expectations.", t);
- failure.addValue(1);
- // TODO: allow for metrics to propagate on failure when running a streaming pipeline
- if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
- throw t;
- }
- }
- }
- }
-
- /**
- * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>},
- * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation
- * over {@code A} and {@code B}, and checks that the relation holds
- * within a dataflow pipeline.
- *
- * <p>This is useful when either/both of {@code A} and {@code B}
- * are not serializable, but have coders (provided
- * by the underlying {@link PCollection}s).
- */
- static class TwoSideInputAssert<ActualT, ExpectedT>
- extends PTransform<PBegin, PDone> implements Serializable {
-
- private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
- private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
- private final AssertRelation<ActualT, ExpectedT> relation;
-
- protected TwoSideInputAssert(
- PTransform<PBegin, PCollectionView<ActualT>> createActual,
- PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
- AssertRelation<ActualT, ExpectedT> relation) {
- this.createActual = createActual;
- this.createExpected = createExpected;
- this.relation = relation;
- }
-
- @Override
- public PDone apply(PBegin input) {
- final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
- final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
-
- input
- .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.named("RunChecks").withSideInputs(actual, expected)
- .of(new CheckerDoFn<>(relation, actual, expected)));
-
- return PDone.in(input.getPipeline());
- }
-
- private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Void, Void> {
- private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
- private final AssertRelation<ActualT, ExpectedT> relation;
- private final PCollectionView<ActualT> actual;
- private final PCollectionView<ExpectedT> expected;
-
- private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
- PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
- this.relation = relation;
- this.actual = actual;
- this.expected = expected;
- }
-
- @Override
- public void processElement(ProcessContext c) {
- try {
- ActualT actualContents = c.sideInput(actual);
- ExpectedT expectedContents = c.sideInput(expected);
- relation.assertFor(expectedContents).apply(actualContents);
- success.addValue(1);
- } catch (Throwable t) {
- LOG.error("DataflowAssert failed expectations.", t);
- failure.addValue(1);
- // TODO: allow for metrics to propagate on failure when running a streaming pipeline
- if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
- throw t;
- }
- }
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@link SerializableFunction} that verifies that an actual value is equal to an
- * expected value.
- */
- private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
- private T expected;
-
- public AssertIsEqualTo(T expected) {
- this.expected = expected;
- }
-
- @Override
- public Void apply(T actual) {
- assertThat(actual, equalTo(expected));
- return null;
- }
- }
-
- /**
- * A {@link SerializableFunction} that verifies that an actual value is not equal to an
- * expected value.
- */
- private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
- private T expected;
-
- public AssertNotEqualTo(T expected) {
- this.expected = expected;
- }
-
- @Override
- public Void apply(T actual) {
- assertThat(actual, not(equalTo(expected)));
- return null;
- }
- }
-
- /**
- * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
- * expected items in any order.
- */
- private static class AssertContainsInAnyOrder<T>
- implements SerializableFunction<Iterable<T>, Void> {
- private T[] expected;
-
- @SafeVarargs
- public AssertContainsInAnyOrder(T... expected) {
- this.expected = expected;
- }
-
- @SuppressWarnings("unchecked")
- public AssertContainsInAnyOrder(Collection<T> expected) {
- this((T[]) expected.toArray());
- }
-
- public AssertContainsInAnyOrder(Iterable<T> expected) {
- this(Lists.<T>newArrayList(expected));
- }
-
- @Override
- public Void apply(Iterable<T> actual) {
- assertThat(actual, containsInAnyOrder(expected));
- return null;
- }
- }
-
- ////////////////////////////////////////////////////////////
-
- /**
- * A binary predicate between types {@code Actual} and {@code Expected}.
- * Implemented as a method {@code assertFor(Expected)} which returns
- * a {@code SerializableFunction<Actual, Void>}
- * that should verify the assertion..
- */
- private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
- public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
- }
-
- /**
- * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
- */
- private static class AssertIsEqualToRelation<T>
- implements AssertRelation<T, T> {
- @Override
- public SerializableFunction<T, Void> assertFor(T expected) {
- return new AssertIsEqualTo<T>(expected);
- }
- }
-
- /**
- * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
- */
- private static class AssertNotEqualToRelation<T>
- implements AssertRelation<T, T> {
- @Override
- public SerializableFunction<T, Void> assertFor(T expected) {
- return new AssertNotEqualTo<T>(expected);
- }
- }
-
- /**
- * An {@code AssertRelation} implementing the binary predicate that two collections are equal
- * modulo reordering.
- */
- private static class AssertContainsInAnyOrderRelation<T>
- implements AssertRelation<Iterable<T>, Iterable<T>> {
- @Override
- public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> expectedElements) {
- return new AssertContainsInAnyOrder<T>(expectedElements);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
deleted file mode 100644
index 60ab2e5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/RunnableOnService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-/**
- * Category tag for tests that can be run on the
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner} if the
- * {@code runIntegrationTestOnService} System property is set to true.
- * Example usage:
- * <pre><code>
- * {@literal @}Test
- * {@literal @}Category(RunnableOnService.class)
- * public void testParDo() {...
- * </code></pre>
- */
-public interface RunnableOnService {}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
deleted file mode 100644
index 10f221e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatcher.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import org.hamcrest.Matcher;
-
-import java.io.Serializable;
-
-/**
- * A {@link Matcher} that is also {@link Serializable}.
- *
- * <p>Such matchers can be used with {@link DataflowAssert}, which builds Dataflow pipelines
- * such that these matchers may be serialized and executed remotely.
- *
- * <p>To create a {@code SerializableMatcher}, extend {@link org.hamcrest.BaseMatcher}
- * and also implement this interface.
- *
- * @param <T> The type of value matched.
- */
-interface SerializableMatcher<T> extends Matcher<T>, Serializable {
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
deleted file mode 100644
index da5171e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
+++ /dev/null
@@ -1,1180 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * Static class for building and using {@link SerializableMatcher} instances.
- *
- * <p>Most matchers are wrappers for hamcrest's {@link Matchers}. Please be familiar with the
- * documentation there. Values retained by a {@link SerializableMatcher} are required to be
- * serializable, either via Java serialization or via a provided {@link Coder}.
- *
- * <p>The following matchers are novel to Dataflow:
- * <ul>
- * <li>{@link #kvWithKey} for matching just the key of a {@link KV}.
- * <li>{@link #kvWithValue} for matching just the value of a {@link KV}.
- * <li>{@link #kv} for matching the key and value of a {@link KV}.
- * </ul>
- *
- * <p>For example, to match a group from
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}, which has type
- * {@code KV<K, Iterable<V>>} for some {@code K} and {@code V} and where the order of the iterable
- * is undefined, use a matcher like
- * {@code kv(equalTo("some key"), containsInAnyOrder(1, 2, 3))}.
- */
-class SerializableMatchers implements Serializable {
-
- // Serializable only because of capture by anonymous inner classes
- private SerializableMatchers() { } // not instantiable
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Iterable)}.
- */
- public static <T> SerializableMatcher<T>
- allOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.allOf(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T> allOf(final SerializableMatcher<T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.allOf(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Iterable)}.
- */
- public static <T> SerializableMatcher<T>
- anyOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.anyOf(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T> anyOf(final SerializableMatcher<T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.anyOf(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anything()}.
- */
- public static SerializableMatcher<Object> anything() {
- return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
- @Override
- public Matcher<Object> get() {
- return Matchers.anything();
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContaining(Object[])}.
- */
- @SafeVarargs
- public static <T extends Serializable> SerializableMatcher<T[]>
- arrayContaining(final T... items) {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContaining(items);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContaining(Object[])}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
- * explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T[]> arrayContaining(Coder<T> coder, T... items) {
-
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
-
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContaining(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContaining(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T[]>
- arrayContaining(final SerializableMatcher<? super T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.<T>arrayContaining(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContaining(List)}.
- */
- public static <T> SerializableMatcher<T[]>
- arrayContaining(List<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContaining(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
- */
- @SafeVarargs
- public static <T extends Serializable> SerializableMatcher<T[]>
- arrayContainingInAnyOrder(final T... items) {
-
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContainingInAnyOrder(items);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
- * explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(Coder<T> coder, T... items) {
-
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
-
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContaining(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContainingInAnyOrder(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
- final SerializableMatcher<? super T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.<T>arrayContainingInAnyOrder(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayContainingInAnyOrder(Collection)}.
- */
- public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
- Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayContainingInAnyOrder(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayWithSize(int)}.
- */
- public static <T> SerializableMatcher<T[]> arrayWithSize(final int size) {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayWithSize(size);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#arrayWithSize(Matcher)}.
- */
- public static <T> SerializableMatcher<T[]> arrayWithSize(
- final SerializableMatcher<? super Integer> sizeMatcher) {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.arrayWithSize(sizeMatcher);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#closeTo(double,double)}.
- */
- public static SerializableMatcher<Double> closeTo(final double target, final double error) {
- return fromSupplier(new SerializableSupplier<Matcher<Double>>() {
- @Override
- public Matcher<Double> get() {
- return Matchers.closeTo(target, error);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#contains(Object[])}.
- */
- @SafeVarargs
- public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
- final T... items) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.contains(items);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#contains(Object[])}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
- * explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<Iterable<? extends T>>
- contains(Coder<T> coder, T... items) {
-
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
-
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.containsInAnyOrder(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#contains(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<Iterable<? extends T>> contains(
- final SerializableMatcher<? super T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.<T>contains(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#contains(List)}.
- */
- public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
- List<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.contains(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#containsInAnyOrder(Object[])}.
- */
- @SafeVarargs
- public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>>
- containsInAnyOrder(final T... items) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.containsInAnyOrder(items);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#containsInAnyOrder(Object[])}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<Iterable<? extends T>>
- containsInAnyOrder(Coder<T> coder, T... items) {
-
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
-
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.containsInAnyOrder(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#containsInAnyOrder(Matcher[])}.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
- final SerializableMatcher<? super T>... matchers) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.<T>containsInAnyOrder(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#containsInAnyOrder(Collection)}.
- */
- public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
- Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
- final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.containsInAnyOrder(matchers);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#containsString}.
- */
- public static SerializableMatcher<String> containsString(final String substring) {
- return fromSupplier(new SerializableSupplier<Matcher<String>>() {
- @Override
- public Matcher<String> get() {
- return Matchers.containsString(substring);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#empty()}.
- */
- public static <T> SerializableMatcher<Collection<? extends T>> empty() {
- return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
- @Override
- public Matcher<Collection<? extends T>> get() {
- return Matchers.empty();
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyArray()}.
- */
- public static <T> SerializableMatcher<T[]> emptyArray() {
- return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
- @Override
- public Matcher<T[]> get() {
- return Matchers.emptyArray();
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyIterable()}.
- */
- public static <T> SerializableMatcher<Iterable<? extends T>> emptyIterable() {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
- @Override
- public Matcher<Iterable<? extends T>> get() {
- return Matchers.emptyIterable();
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#endsWith}.
- */
- public static SerializableMatcher<String> endsWith(final String substring) {
- return fromSupplier(new SerializableSupplier<Matcher<String>>() {
- @Override
- public Matcher<String> get() {
- return Matchers.endsWith(substring);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
- */
- public static <T extends Serializable> SerializableMatcher<T> equalTo(final T expected) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.equalTo(expected);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
- *
- * <p>The expected value of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T> SerializableMatcher<T> equalTo(Coder<T> coder, T expected) {
-
- final SerializableSupplier<T> expectedSupplier = new SerializableViaCoder<>(coder, expected);
-
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.equalTo(expectedSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
- */
- public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
- greaterThan(final T target) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.greaterThan(target);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
- *
- * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
- greaterThan(final Coder<T> coder, T target) {
- final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.greaterThan(targetSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#greaterThanOrEqualTo()}.
- */
- public static <T extends Comparable<T>> SerializableMatcher<T> greaterThanOrEqualTo(
- final T target) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.greaterThanOrEqualTo(target);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#greaterThanOrEqualTo()}.
- *
- * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
- greaterThanOrEqualTo(final Coder<T> coder, T target) {
- final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.greaterThanOrEqualTo(targetSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
- */
- public static <T extends Serializable> SerializableMatcher<Iterable<? super T>> hasItem(
- final T target) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
- @Override
- public Matcher<Iterable<? super T>> get() {
- return Matchers.hasItem(target);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
- *
- * <p>The item of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T> SerializableMatcher<Iterable<? super T>> hasItem(Coder<T> coder, T target) {
- final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
- @Override
- public Matcher<Iterable<? super T>> get() {
- return Matchers.hasItem(targetSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Matcher)}.
- */
- public static <T> SerializableMatcher<Iterable<? super T>> hasItem(
- final SerializableMatcher<? super T> matcher) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
- @Override
- public Matcher<Iterable<? super T>> get() {
- return Matchers.hasItem(matcher);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(int)}.
- */
- public static <T> SerializableMatcher<Collection<? extends T>> hasSize(final int size) {
- return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
- @Override
- public Matcher<Collection<? extends T>> get() {
- return Matchers.hasSize(size);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(Matcher)}.
- */
- public static <T> SerializableMatcher<Collection<? extends T>> hasSize(
- final SerializableMatcher<? super Integer> sizeMatcher) {
- return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
- @Override
- public Matcher<Collection<? extends T>> get() {
- return Matchers.hasSize(sizeMatcher);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#iterableWithSize(int)}.
- */
- public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(final int size) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
- @Override
- public Matcher<Iterable<T>> get() {
- return Matchers.iterableWithSize(size);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#iterableWithSize(Matcher)}.
- */
- public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(
- final SerializableMatcher<? super Integer> sizeMatcher) {
- return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
- @Override
- public Matcher<Iterable<T>> get() {
- return Matchers.iterableWithSize(sizeMatcher);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
- */
- public static <T extends Serializable> SerializableMatcher<T>
- isIn(final Collection<T> collection) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isIn(collection);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
- * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T> SerializableMatcher<T> isIn(Coder<T> coder, Collection<T> collection) {
- @SuppressWarnings("unchecked")
- T[] items = (T[]) collection.toArray();
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isIn(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
- */
- public static <T extends Serializable> SerializableMatcher<T> isIn(final T[] items) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isIn(items);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
- * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T> SerializableMatcher<T> isIn(Coder<T> coder, T[] items) {
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isIn(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
- */
- @SafeVarargs
- public static <T extends Serializable> SerializableMatcher<T> isOneOf(final T... elems) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isOneOf(elems);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
- *
- * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
- * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- @SafeVarargs
- public static <T> SerializableMatcher<T> isOneOf(Coder<T> coder, T... items) {
- final SerializableSupplier<T[]> itemsSupplier =
- new SerializableArrayViaCoder<>(coder, items);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.isOneOf(itemsSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
- */
- public static <K extends Serializable, V> SerializableMatcher<KV<? extends K, ? extends V>>
- kvWithKey(K key) {
- return new KvKeyMatcher<K, V>(equalTo(key));
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
- *
- * <p>The key of type {@code K} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
- kvWithKey(Coder<K> coder, K key) {
- return new KvKeyMatcher<K, V>(equalTo(coder, key));
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with matching key.
- */
- public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithKey(
- final SerializableMatcher<? super K> keyMatcher) {
- return new KvKeyMatcher<K, V>(keyMatcher);
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
- */
- public static <K, V extends Serializable> SerializableMatcher<KV<? extends K, ? extends V>>
- kvWithValue(V value) {
- return new KvValueMatcher<K, V>(equalTo(value));
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
- *
- * <p>The value of type {@code V} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
- kvWithValue(Coder<V> coder, V value) {
- return new KvValueMatcher<K, V>(equalTo(coder, value));
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with matching value.
- */
- public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithValue(
- final SerializableMatcher<? super V> valueMatcher) {
- return new KvValueMatcher<>(valueMatcher);
- }
-
- /**
- * A {@link SerializableMatcher} that matches any {@link KV} with matching key and value.
- */
- public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kv(
- final SerializableMatcher<? super K> keyMatcher,
- final SerializableMatcher<? super V> valueMatcher) {
-
- return SerializableMatchers.<KV<? extends K, ? extends V>>allOf(
- SerializableMatchers.<K, V>kvWithKey(keyMatcher),
- SerializableMatchers.<K, V>kvWithValue(valueMatcher));
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
- */
- public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThan(
- final T target) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.lessThan(target);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
- *
- * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T extends Comparable<T>> SerializableMatcher<T>
- lessThan(Coder<T> coder, T target) {
- final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.lessThan(targetSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#lessThanOrEqualTo()}.
- */
- public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThanOrEqualTo(
- final T target) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.lessThanOrEqualTo(target);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#lessThanOrEqualTo()}.
- *
- * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
- * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
- */
- public static <T extends Comparable<T>> SerializableMatcher<T> lessThanOrEqualTo(
- Coder<T> coder, T target) {
- final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.lessThanOrEqualTo(targetSupplier.get());
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#not}.
- */
- public static <T> SerializableMatcher<T> not(final SerializableMatcher<T> matcher) {
- return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- @Override
- public Matcher<T> get() {
- return Matchers.not(matcher);
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to
- * {@link Matchers#nullValue}.
- */
- public static SerializableMatcher<Object> nullValue() {
- return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
- @Override
- public Matcher<Object> get() {
- return Matchers.nullValue();
- }
- });
- }
-
- /**
- * A {@link SerializableMatcher} with identical criteria to {@link Matchers#startsWith}.
- */
- public static SerializableMatcher<String> startsWith(final String substring) {
- return fromSupplier(new SerializableSupplier<Matcher<String>>() {
- @Override
- public Matcher<String> get() {
- return Matchers.startsWith(substring);
- }
- });
- }
-
- private static class KvKeyMatcher<K, V>
- extends BaseMatcher<KV<? extends K, ? extends V>>
- implements SerializableMatcher<KV<? extends K, ? extends V>> {
- private final SerializableMatcher<? super K> keyMatcher;
-
- public KvKeyMatcher(SerializableMatcher<? super K> keyMatcher) {
- this.keyMatcher = keyMatcher;
- }
-
- @Override
- public boolean matches(Object item) {
- @SuppressWarnings("unchecked")
- KV<K, ?> kvItem = (KV<K, ?>) item;
- return keyMatcher.matches(kvItem.getKey());
- }
-
- @Override
- public void describeMismatch(Object item, Description mismatchDescription) {
- @SuppressWarnings("unchecked")
- KV<K, ?> kvItem = (KV<K, ?>) item;
- if (!keyMatcher.matches(kvItem.getKey())) {
- mismatchDescription.appendText("key did not match: ");
- keyMatcher.describeMismatch(kvItem.getKey(), mismatchDescription);
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("KV with key matching ");
- keyMatcher.describeTo(description);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .addValue(keyMatcher)
- .toString();
- }
- }
-
- private static class KvValueMatcher<K, V>
- extends BaseMatcher<KV<? extends K, ? extends V>>
- implements SerializableMatcher<KV<? extends K, ? extends V>> {
- private final SerializableMatcher<? super V> valueMatcher;
-
- public KvValueMatcher(SerializableMatcher<? super V> valueMatcher) {
- this.valueMatcher = valueMatcher;
- }
-
- @Override
- public boolean matches(Object item) {
- @SuppressWarnings("unchecked")
- KV<?, V> kvItem = (KV<?, V>) item;
- return valueMatcher.matches(kvItem.getValue());
- }
-
- @Override
- public void describeMismatch(Object item, Description mismatchDescription) {
- @SuppressWarnings("unchecked")
- KV<?, V> kvItem = (KV<?, V>) item;
- if (!valueMatcher.matches(kvItem.getValue())) {
- mismatchDescription.appendText("value did not match: ");
- valueMatcher.describeMismatch(kvItem.getValue(), mismatchDescription);
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("KV with value matching ");
- valueMatcher.describeTo(description);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .addValue(valueMatcher)
- .toString();
- }
- }
-
- /**
- * Constructs a {@link SerializableMatcher} from a non-serializable {@link Matcher} via
- * indirection through {@link SerializableSupplier}.
- *
- * <p>To wrap a {@link Matcher} which is not serializable, provide a {@link SerializableSupplier}
- * with a {@link SerializableSupplier#get()} method that returns a fresh instance of the
- * {@link Matcher} desired. The resulting {@link SerializableMatcher} will behave according to
- * the {@link Matcher} returned by {@link SerializableSupplier#get() get()} when it is invoked
- * during matching (which may occur on another machine, such as a Dataflow worker).
- *
- * <code>
- * return fromSupplier(new SerializableSupplier<Matcher<T>>() {
- * * @Override
- * public Matcher<T> get() {
- * return new MyMatcherForT();
- * }
- * });
- * </code>
- */
- public static <T> SerializableMatcher<T> fromSupplier(
- SerializableSupplier<Matcher<T>> supplier) {
- return new SerializableMatcherFromSupplier<>(supplier);
- }
-
- /**
- * Supplies values of type {@code T}, and is serializable. Thus, even if {@code T} is not
- * serializable, the supplier can be serialized and provide a {@code T} wherever it is
- * deserialized.
- *
- * @param <T> the type of value supplied.
- */
- public interface SerializableSupplier<T> extends Serializable {
- T get();
- }
-
- /**
- * Since the delegate {@link Matcher} is not generally serializable, instead this takes a nullary
- * SerializableFunction to return such a matcher.
- */
- private static class SerializableMatcherFromSupplier<T> extends BaseMatcher<T>
- implements SerializableMatcher<T> {
-
- private SerializableSupplier<Matcher<T>> supplier;
-
- public SerializableMatcherFromSupplier(SerializableSupplier<Matcher<T>> supplier) {
- this.supplier = supplier;
- }
-
- @Override
- public void describeTo(Description description) {
- supplier.get().describeTo(description);
- }
-
- @Override
- public boolean matches(Object item) {
- return supplier.get().matches(item);
- }
-
- @Override
- public void describeMismatch(Object item, Description mismatchDescription) {
- supplier.get().describeMismatch(item, mismatchDescription);
- }
- }
-
- /**
- * Wraps any value that can be encoded via a {@link Coder} to make it {@link Serializable}.
- * This is not likely to be a good encoding, so should be used only for tests, where data
- * volume is small and minor costs are not critical.
- */
- private static class SerializableViaCoder<T> implements SerializableSupplier<T> {
- /** Cached value that is not serialized. */
- @Nullable
- private transient T value;
-
- /** The bytes of {@link #value} when encoded via {@link #coder}. */
- private byte[] encodedValue;
-
- private Coder<T> coder;
-
- public SerializableViaCoder(Coder<T> coder, T value) {
- this.coder = coder;
- this.value = value;
- try {
- this.encodedValue = CoderUtils.encodeToByteArray(coder, value);
- } catch (CoderException exc) {
- throw new RuntimeException("Error serializing via Coder", exc);
- }
- }
-
- @Override
- public T get() {
- if (value == null) {
- try {
- value = CoderUtils.decodeFromByteArray(coder, encodedValue);
- } catch (CoderException exc) {
- throw new RuntimeException("Error deserializing via Coder", exc);
- }
- }
- return value;
- }
- }
-
- /**
- * Wraps any array with values that can be encoded via a {@link Coder} to make it
- * {@link Serializable}. This is not likely to be a good encoding, so should be used only for
- * tests, where data volume is small and minor costs are not critical.
- */
- private static class SerializableArrayViaCoder<T> implements SerializableSupplier<T[]> {
- /** Cached value that is not serialized. */
- @Nullable
- private transient T[] value;
-
- /** The bytes of {@link #value} when encoded via {@link #coder}. */
- private byte[] encodedValue;
-
- private Coder<List<T>> coder;
-
- public SerializableArrayViaCoder(Coder<T> elementCoder, T[] value) {
- this.coder = ListCoder.of(elementCoder);
- this.value = value;
- try {
- this.encodedValue = CoderUtils.encodeToByteArray(coder, Arrays.asList(value));
- } catch (CoderException exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public T[] get() {
- if (value == null) {
- try {
- @SuppressWarnings("unchecked")
- T[] decoded = (T[]) CoderUtils.decodeFromByteArray(coder, encodedValue).toArray();
- value = decoded;
- } catch (CoderException exc) {
- throw new RuntimeException("Error deserializing via Coder", exc);
- }
- }
- return value;
- }
- }
-}