You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:26 UTC
[02/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/PipelineTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/PipelineTest.java
deleted file mode 100644
index e311252..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/PipelineTest.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for Pipeline.
- */
-@RunWith(JUnit4.class)
-public class PipelineTest {
-
- @Rule public ExpectedLogs logged = ExpectedLogs.none(Pipeline.class);
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- static class PipelineWrapper extends Pipeline {
- protected PipelineWrapper(PipelineRunner<?> runner) {
- super(runner, PipelineOptionsFactory.create());
- }
- }
-
- // Mock class that throws a user code exception during the call to
- // Pipeline.run().
- static class TestPipelineRunnerThrowingUserException
- extends PipelineRunner<PipelineResult> {
- @Override
- public PipelineResult run(Pipeline pipeline) {
- Throwable t = new IllegalStateException("user code exception");
- throw UserCodeException.wrap(t);
- }
- }
-
- // Mock class that throws an SDK or API client code exception during
- // the call to Pipeline.run().
- static class TestPipelineRunnerThrowingSDKException
- extends PipelineRunner<PipelineResult> {
- @Override
- public PipelineResult run(Pipeline pipeline) {
- throw new IllegalStateException("SDK exception");
- }
- }
-
- @Test
- public void testPipelineUserExceptionHandling() {
- Pipeline p = new PipelineWrapper(
- new TestPipelineRunnerThrowingUserException());
-
- // Check pipeline runner correctly catches user errors.
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
- thrown.expectMessage("user code exception");
- p.run();
- }
-
- @Test
- public void testPipelineSDKExceptionHandling() {
- Pipeline p = new PipelineWrapper(new TestPipelineRunnerThrowingSDKException());
-
- // Check pipeline runner correctly catches SDK errors.
- try {
- p.run();
- fail("Should have thrown an exception.");
- } catch (RuntimeException exn) {
- // Make sure the exception isn't a UserCodeException.
- Assert.assertThat(exn, not(instanceOf(UserCodeException.class)));
- // Assert that the message is correct.
- Assert.assertThat(exn.getMessage(), containsString("SDK exception"));
- // RuntimeException should be IllegalStateException.
- Assert.assertThat(exn, instanceOf(IllegalStateException.class));
- }
- }
-
- @Test
- @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
- public void testMultipleApply() {
- PTransform<PCollection<? extends String>, PCollection<String>> myTransform =
- addSuffix("+");
-
- Pipeline p = TestPipeline.create();
- PCollection<String> input = p.apply(Create.<String>of(ImmutableList.of("a", "b")));
-
- PCollection<String> left = input.apply("Left1", myTransform).apply("Left2", myTransform);
- PCollection<String> right = input.apply("Right", myTransform);
-
- PCollection<String> both = PCollectionList.of(left).and(right)
- .apply(Flatten.<String>pCollections());
-
- DataflowAssert.that(both).containsInAnyOrder("a++", "b++", "a+", "b+");
-
- p.run();
- }
-
- private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
- final String suffix) {
- return ParDo.of(new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) {
- c.output(c.element() + suffix);
- }
- });
- }
-
- @Test
- public void testToString() {
- PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class);
- options.setRunner(DirectPipelineRunner.class);
- Pipeline pipeline = Pipeline.create(options);
- assertEquals("Pipeline#" + pipeline.hashCode(), pipeline.toString());
- }
-
- @Test
- public void testStableUniqueNameOff() {
- Pipeline p = TestPipeline.create();
- p.getOptions().setStableUniqueNames(CheckEnabled.OFF);
-
- p.apply(Create.of(5, 6, 7));
- p.apply(Create.of(5, 6, 7));
-
- logged.verifyNotLogged("does not have a stable unique name.");
- }
-
- @Test
- public void testStableUniqueNameWarning() {
- Pipeline p = TestPipeline.create();
- p.getOptions().setStableUniqueNames(CheckEnabled.WARNING);
-
- p.apply(Create.of(5, 6, 7));
- p.apply(Create.of(5, 6, 7));
-
- logged.verifyWarn("does not have a stable unique name.");
- }
-
- @Test
- public void testStableUniqueNameError() {
- Pipeline p = TestPipeline.create();
- p.getOptions().setStableUniqueNames(CheckEnabled.ERROR);
-
- p.apply(Create.of(5, 6, 7));
-
- thrown.expectMessage("does not have a stable unique name.");
- p.apply(Create.of(5, 6, 7));
- }
-
- /**
- * Tests that Pipeline supports a pass-through identity function.
- */
- @Test
- @Category(RunnableOnService.class)
- public void testIdentityTransform() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- PCollection<Integer> output = pipeline
- .apply(Create.<Integer>of(1, 2, 3, 4))
- .apply("IdentityTransform", new IdentityTransform<PCollection<Integer>>());
-
- DataflowAssert.that(output).containsInAnyOrder(1, 2, 3, 4);
- pipeline.run();
- }
-
- private static class IdentityTransform<T extends PInput & POutput>
- extends PTransform<T, T> {
- @Override
- public T apply(T input) {
- return input;
- }
- }
-
- /**
- * Tests that Pipeline supports pulling an element out of a tuple as a transform.
- */
- @Test
- @Category(RunnableOnService.class)
- public void testTupleProjectionTransform() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- PCollection<Integer> input = pipeline
- .apply(Create.<Integer>of(1, 2, 3, 4));
-
- TupleTag<Integer> tag = new TupleTag<Integer>();
- PCollectionTuple tuple = PCollectionTuple.of(tag, input);
-
- PCollection<Integer> output = tuple
- .apply("ProjectTag", new TupleProjectionTransform<Integer>(tag));
-
- DataflowAssert.that(output).containsInAnyOrder(1, 2, 3, 4);
- pipeline.run();
- }
-
- private static class TupleProjectionTransform<T>
- extends PTransform<PCollectionTuple, PCollection<T>> {
- private TupleTag<T> tag;
-
- public TupleProjectionTransform(TupleTag<T> tag) {
- this.tag = tag;
- }
-
- @Override
- public PCollection<T> apply(PCollectionTuple input) {
- return input.get(tag);
- }
- }
-
- /**
- * Tests that Pipeline supports putting an element into a tuple as a transform.
- */
- @Test
- @Category(RunnableOnService.class)
- public void testTupleInjectionTransform() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- PCollection<Integer> input = pipeline
- .apply(Create.<Integer>of(1, 2, 3, 4));
-
- TupleTag<Integer> tag = new TupleTag<Integer>();
-
- PCollectionTuple output = input
- .apply("ProjectTag", new TupleInjectionTransform<Integer>(tag));
-
- DataflowAssert.that(output.get(tag)).containsInAnyOrder(1, 2, 3, 4);
- pipeline.run();
- }
-
- private static class TupleInjectionTransform<T>
- extends PTransform<PCollection<T>, PCollectionTuple> {
- private TupleTag<T> tag;
-
- public TupleInjectionTransform(TupleTag<T> tag) {
- this.tag = tag;
- }
-
- @Override
- public PCollectionTuple apply(PCollection<T> input) {
- return PCollectionTuple.of(tag, input);
- }
- }
-
- /**
- * Tests that an empty pipeline runs.
- */
- @Test
- public void testEmptyPipeline() throws Exception {
- Pipeline pipeline = TestPipeline.create();
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/TestUtils.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/TestUtils.java
deleted file mode 100644
index 257ecbb..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/TestUtils.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk;
-
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Utilities for tests.
- */
-public class TestUtils {
- // Do not instantiate.
- private TestUtils() {}
-
- public static final String[] NO_LINES_ARRAY = new String[] { };
-
- public static final List<String> NO_LINES = Arrays.asList(NO_LINES_ARRAY);
-
- public static final String[] LINES_ARRAY = new String[] {
- "To be, or not to be: that is the question: ",
- "Whether 'tis nobler in the mind to suffer ",
- "The slings and arrows of outrageous fortune, ",
- "Or to take arms against a sea of troubles, ",
- "And by opposing end them? To die: to sleep; ",
- "No more; and by a sleep to say we end ",
- "The heart-ache and the thousand natural shocks ",
- "That flesh is heir to, 'tis a consummation ",
- "Devoutly to be wish'd. To die, to sleep; ",
- "To sleep: perchance to dream: ay, there's the rub; ",
- "For in that sleep of death what dreams may come ",
- "When we have shuffled off this mortal coil, ",
- "Must give us pause: there's the respect ",
- "That makes calamity of so long life; ",
- "For who would bear the whips and scorns of time, ",
- "The oppressor's wrong, the proud man's contumely, ",
- "The pangs of despised love, the law's delay, ",
- "The insolence of office and the spurns ",
- "That patient merit of the unworthy takes, ",
- "When he himself might his quietus make ",
- "With a bare bodkin? who would fardels bear, ",
- "To grunt and sweat under a weary life, ",
- "But that the dread of something after death, ",
- "The undiscover'd country from whose bourn ",
- "No traveller returns, puzzles the will ",
- "And makes us rather bear those ills we have ",
- "Than fly to others that we know not of? ",
- "Thus conscience does make cowards of us all; ",
- "And thus the native hue of resolution ",
- "Is sicklied o'er with the pale cast of thought, ",
- "And enterprises of great pith and moment ",
- "With this regard their currents turn awry, ",
- "And lose the name of action.--Soft you now! ",
- "The fair Ophelia! Nymph, in thy orisons ",
- "Be all my sins remember'd." };
-
- public static final List<String> LINES = Arrays.asList(LINES_ARRAY);
-
- public static final String[] LINES2_ARRAY = new String[] {
- "hi", "there", "bob!" };
-
- public static final List<String> LINES2 = Arrays.asList(LINES2_ARRAY);
-
- public static final Integer[] NO_INTS_ARRAY = new Integer[] { };
-
- public static final List<Integer> NO_INTS = Arrays.asList(NO_INTS_ARRAY);
-
- public static final Integer[] INTS_ARRAY = new Integer[] {
- 3, 42, Integer.MAX_VALUE, 0, -1, Integer.MIN_VALUE, 666 };
-
- public static final List<Integer> INTS = Arrays.asList(INTS_ARRAY);
-
- /**
- * Matcher for KVs.
- */
- public static class KvMatcher<K, V>
- extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
- final Matcher<? super K> keyMatcher;
- final Matcher<? super V> valueMatcher;
-
- public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
- Matcher<V> valueMatcher) {
- return new KvMatcher<>(keyMatcher, valueMatcher);
- }
-
- public KvMatcher(Matcher<? super K> keyMatcher,
- Matcher<? super V> valueMatcher) {
- this.keyMatcher = keyMatcher;
- this.valueMatcher = valueMatcher;
- }
-
- @Override
- public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
- return keyMatcher.matches(kv.getKey())
- && valueMatcher.matches(kv.getValue());
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a KV(").appendValue(keyMatcher)
- .appendText(", ").appendValue(valueMatcher)
- .appendText(")");
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Utilities for testing CombineFns, ensuring they give correct results
- // across various permutations and shardings of the input.
-
- public static <InputT, AccumT, OutputT> void checkCombineFn(
- CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, final OutputT expected) {
- checkCombineFn(fn, input, CoreMatchers.is(expected));
- }
-
- public static <InputT, AccumT, OutputT> void checkCombineFn(
- CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) {
- checkCombineFnInternal(fn, input, matcher);
- Collections.shuffle(input);
- checkCombineFnInternal(fn, input, matcher);
- }
-
- private static <InputT, AccumT, OutputT> void checkCombineFnInternal(
- CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) {
- int size = input.size();
- checkCombineFnShards(fn, Collections.singletonList(input), matcher);
- checkCombineFnShards(fn, shardEvenly(input, 2), matcher);
- if (size > 4) {
- checkCombineFnShards(fn, shardEvenly(input, size / 2), matcher);
- checkCombineFnShards(
- fn, shardEvenly(input, (int) (size / Math.sqrt(size))), matcher);
- }
- checkCombineFnShards(fn, shardExponentially(input, 1.4), matcher);
- checkCombineFnShards(fn, shardExponentially(input, 2), matcher);
- checkCombineFnShards(fn, shardExponentially(input, Math.E), matcher);
- }
-
- public static <InputT, AccumT, OutputT> void checkCombineFnShards(
- CombineFn<InputT, AccumT, OutputT> fn,
- List<? extends Iterable<InputT>> shards,
- Matcher<? super OutputT> matcher) {
- checkCombineFnShardsInternal(fn, shards, matcher);
- Collections.shuffle(shards);
- checkCombineFnShardsInternal(fn, shards, matcher);
- }
-
- private static <InputT, AccumT, OutputT> void checkCombineFnShardsInternal(
- CombineFn<InputT, AccumT, OutputT> fn,
- Iterable<? extends Iterable<InputT>> shards,
- Matcher<? super OutputT> matcher) {
- List<AccumT> accumulators = new ArrayList<>();
- int maybeCompact = 0;
- for (Iterable<InputT> shard : shards) {
- AccumT accumulator = fn.createAccumulator();
- for (InputT elem : shard) {
- accumulator = fn.addInput(accumulator, elem);
- }
- if (maybeCompact++ % 2 == 0) {
- accumulator = fn.compact(accumulator);
- }
- accumulators.add(accumulator);
- }
- AccumT merged = fn.mergeAccumulators(accumulators);
- assertThat(fn.extractOutput(merged), matcher);
- }
-
- private static <T> List<List<T>> shardEvenly(List<T> input, int numShards) {
- List<List<T>> shards = new ArrayList<>(numShards);
- for (int i = 0; i < numShards; i++) {
- shards.add(input.subList(i * input.size() / numShards,
- (i + 1) * input.size() / numShards));
- }
- return shards;
- }
-
- private static <T> List<List<T>> shardExponentially(
- List<T> input, double base) {
- assert base > 1.0;
- List<List<T>> shards = new ArrayList<>();
- int end = input.size();
- while (end > 0) {
- int start = (int) (end / base);
- shards.add(input.subList(start, end));
- end = start;
- }
- return shards;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/WindowMatchers.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/WindowMatchers.java
deleted file mode 100644
index 9d7cfc8..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/WindowMatchers.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Objects;
-
-/**
- * Matchers that are useful for working with Windowing, Timestamps, etc.
- */
-public class WindowMatchers {
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
- Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowsMatcher);
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- T value, long timestamp, long windowStart, long windowEnd) {
- return WindowMatchers.<T>isSingleWindowedValue(
- Matchers.equalTo(value), timestamp, windowStart, windowEnd);
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
- IntervalWindow intervalWindow =
- new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
- return WindowMatchers.<T>isSingleWindowedValue(
- valueMatcher,
- Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
- Matchers.<BoundedWindow>equalTo(intervalWindow));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
- Matcher<? super BoundedWindow> windowMatcher) {
- return new WindowedValueMatcher<T>(
- valueMatcher, timestampMatcher, Matchers.contains(windowMatcher));
- }
-
- public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
- return Matchers.equalTo(new IntervalWindow(new Instant(start), new Instant(end)));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> valueWithPaneInfo(final PaneInfo paneInfo) {
- return new TypeSafeMatcher<WindowedValue<? extends T>>() {
- @Override
- public void describeTo(Description description) {
- description
- .appendText("WindowedValue(paneInfo = ").appendValue(paneInfo).appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(WindowedValue<? extends T> item) {
- return Objects.equals(item.getPane(), paneInfo);
- }
-
- @Override
- protected void describeMismatchSafely(
- WindowedValue<? extends T> item, Description mismatchDescription) {
- mismatchDescription.appendValue(item.getPane());
- }
- };
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @SafeVarargs
- public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
- Matcher<W>... windows) {
- return (Matcher) Matchers.<W>containsInAnyOrder(windows);
- }
-
- private WindowMatchers() {}
-
- private static class WindowedValueMatcher<T> extends TypeSafeMatcher<WindowedValue<? extends T>> {
-
- private Matcher<? super T> valueMatcher;
- private Matcher<? super Instant> timestampMatcher;
- private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher;
-
- private WindowedValueMatcher(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
- this.valueMatcher = valueMatcher;
- this.timestampMatcher = timestampMatcher;
- this.windowsMatcher = windowsMatcher;
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a WindowedValue(").appendValue(valueMatcher)
- .appendText(", ").appendValue(timestampMatcher)
- .appendText(", ").appendValue(windowsMatcher)
- .appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
- return valueMatcher.matches(windowedValue.getValue())
- && timestampMatcher.matches(windowedValue.getTimestamp())
- && windowsMatcher.matches(windowedValue.getWindows());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java
deleted file mode 100644
index db6e944..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java
+++ /dev/null
@@ -1,754 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.AvroName;
-import org.apache.avro.reflect.AvroSchema;
-import org.apache.avro.reflect.Nullable;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.Stringable;
-import org.apache.avro.reflect.Union;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.util.Utf8;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/** Tests for {@link AvroCoder}. */
-@RunWith(JUnit4.class)
-public class AvroCoderTest {
-
- @DefaultCoder(AvroCoder.class)
- private static class Pojo {
- public String text;
- public int count;
-
- // Empty constructor required for Avro decoding.
- @SuppressWarnings("unused")
- public Pojo() {
- }
-
- public Pojo(String text, int count) {
- this.text = text;
- this.count = count;
- }
-
- // auto-generated
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Pojo pojo = (Pojo) o;
-
- if (count != pojo.count) {
- return false;
- }
- if (text != null
- ? !text.equals(pojo.text)
- : pojo.text != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- @Override
- public String toString() {
- return "Pojo{"
- + "text='" + text + '\''
- + ", count=" + count
- + '}';
- }
- }
-
- private static class GetTextFn extends DoFn<Pojo, String> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().text);
- }
- }
-
- @Test
- public void testAvroCoderEncoding() throws Exception {
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
- CloudObject encoding = coder.asCloudObject();
-
- Assert.assertThat(encoding.keySet(),
- Matchers.containsInAnyOrder("@type", "type", "schema", "encoding_id"));
- }
-
- @Test
- public void testPojoEncoding() throws Exception {
- Pojo value = new Pojo("Hello", 42);
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-
- @Test
- public void testPojoEncodingId() throws Exception {
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
- CoderProperties.coderHasEncodingId(coder, Pojo.class.getName());
- }
-
- @Test
- public void testGenericRecordEncoding() throws Exception {
- String schemaString =
- "{\"namespace\": \"example.avro\",\n"
- + " \"type\": \"record\",\n"
- + " \"name\": \"User\",\n"
- + " \"fields\": [\n"
- + " {\"name\": \"name\", \"type\": \"string\"},\n"
- + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
- + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
- + " ]\n"
- + "}";
- Schema schema = (new Schema.Parser()).parse(schemaString);
-
- GenericRecord before = new GenericData.Record(schema);
- before.put("name", "Bob");
- before.put("favorite_number", 256);
- // Leave favorite_color null
-
- AvroCoder<GenericRecord> coder = AvroCoder.of(GenericRecord.class, schema);
-
- CoderProperties.coderDecodeEncodeEqual(coder, before);
- Assert.assertEquals(schema, coder.getSchema());
- }
-
- @Test
- public void testEncodingNotBuffered() throws Exception {
- // This test ensures that the coder doesn't read ahead and buffer data.
- // Reading ahead causes a problem if the stream consists of records of different
- // types.
- Pojo before = new Pojo("Hello", 42);
-
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
- SerializableCoder<Integer> intCoder = SerializableCoder.of(Integer.class);
-
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-
- Context context = Context.NESTED;
- coder.encode(before, outStream, context);
- intCoder.encode(10, outStream, context);
-
- ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
-
- Pojo after = coder.decode(inStream, context);
- Assert.assertEquals(before, after);
-
- Integer intAfter = intCoder.decode(inStream, context);
- Assert.assertEquals(new Integer(10), intAfter);
- }
-
- @Test
- public void testDefaultCoder() throws Exception {
- Pipeline p = TestPipeline.create();
-
- // Use MyRecord as input and output types without explicitly specifying
- // a coder (this uses the default coders, which may not be AvroCoder).
- PCollection<String> output =
- p.apply(Create.of(new Pojo("hello", 1), new Pojo("world", 2)))
- .apply(ParDo.of(new GetTextFn()));
-
- DataflowAssert.that(output)
- .containsInAnyOrder("hello", "world");
- p.run();
- }
-
- @Test
- public void testAvroCoderIsSerializable() throws Exception {
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
-
- // Check that the coder is serializable using the regular JSON approach.
- SerializableUtils.ensureSerializable(coder);
- }
-
- private final void assertDeterministic(AvroCoder<?> coder) {
- try {
- coder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- fail("Expected " + coder + " to be deterministic, but got:\n" + e);
- }
- }
-
- private final void assertNonDeterministic(AvroCoder<?> coder,
- Matcher<String> reason1) {
- try {
- coder.verifyDeterministic();
- fail("Expected " + coder + " to be non-deterministic.");
- } catch (NonDeterministicException e) {
- assertThat(e.getReasons(), Matchers.<String>iterableWithSize(1));
- assertThat(e.getReasons(), Matchers.<String>contains(reason1));
- }
- }
-
- @Test
- public void testDeterministicInteger() {
- assertDeterministic(AvroCoder.of(Integer.class));
- }
-
- @Test
- public void testDeterministicInt() {
- assertDeterministic(AvroCoder.of(int.class));
- }
-
- private static class SimpleDeterministicClass {
- @SuppressWarnings("unused")
- private Integer intField;
- @SuppressWarnings("unused")
- private char charField;
- @SuppressWarnings("unused")
- private Integer[] intArray;
- @SuppressWarnings("unused")
- private Utf8 utf8field;
- }
-
- @Test
- public void testDeterministicSimple() {
- assertDeterministic(AvroCoder.of(SimpleDeterministicClass.class));
- }
-
- private static class UnorderedMapClass {
- @SuppressWarnings("unused")
- private Map<String, String> mapField;
- }
-
- private Matcher<String> reason(final String prefix, final String messagePart) {
- return new TypeSafeMatcher<String>(String.class) {
- @Override
- public void describeTo(Description description) {
- description.appendText(String.format("Reason starting with '%s:' containing '%s'",
- prefix, messagePart));
- }
-
- @Override
- protected boolean matchesSafely(String item) {
- return item.startsWith(prefix + ":") && item.contains(messagePart);
- }
- };
- }
-
- private Matcher<String> reasonClass(Class<?> clazz, String message) {
- return reason(clazz.getName(), message);
- }
-
- private Matcher<String> reasonField(
- Class<?> clazz, String field, String message) {
- return reason(clazz.getName() + "#" + field, message);
- }
-
- @Test
- public void testDeterministicUnorderedMap() {
- assertNonDeterministic(AvroCoder.of(UnorderedMapClass.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "java.util.Map<java.lang.String, java.lang.String> "
- + "may not be deterministically ordered"));
- }
-
- private static class NonDeterministicArray {
- @SuppressWarnings("unused")
- private UnorderedMapClass[] arrayField;
- }
- @Test
- public void testDeterministicNonDeterministicArray() {
- assertNonDeterministic(AvroCoder.of(NonDeterministicArray.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "java.util.Map<java.lang.String, java.lang.String>"
- + " may not be deterministically ordered"));
- }
-
- private static class SubclassOfUnorderedMapClass extends UnorderedMapClass {}
-
-
- @Test
- public void testDeterministicNonDeterministicChild() {
- // Super class has non deterministic fields.
- assertNonDeterministic(AvroCoder.of(SubclassOfUnorderedMapClass.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "may not be deterministically ordered"));
- }
-
- private static class SubclassHidingParent extends UnorderedMapClass {
- @SuppressWarnings("unused")
- @AvroName("mapField2") // AvroName is not enough
- private int mapField;
- }
-
- @Test
- public void testAvroProhibitsShadowing() {
- // This test verifies that Avro won't serialize a class with two fields of
- // the same name. This is important for our error reporting, and also how
- // we lookup a field.
- try {
- ReflectData.get().getSchema(SubclassHidingParent.class);
- fail("Expected AvroTypeException");
- } catch (AvroTypeException e) {
- assertThat(e.getMessage(), containsString("mapField"));
- assertThat(e.getMessage(), containsString("two fields named"));
- }
- }
-
- private static class FieldWithAvroName {
- @AvroName("name")
- @SuppressWarnings("unused")
- private int someField;
- }
-
- @Test
- public void testDeterministicWithAvroName() {
- assertDeterministic(AvroCoder.of(FieldWithAvroName.class));
- }
-
- @Test
- public void testDeterminismSortedMap() {
- assertDeterministic(AvroCoder.of(StringSortedMapField.class));
- }
-
- private static class StringSortedMapField {
- @SuppressWarnings("unused")
- SortedMap<String, String> sortedMapField;
- }
-
- @Test
- public void testDeterminismTreeMapValue() {
- // The value is non-deterministic, so we should fail.
- assertNonDeterministic(AvroCoder.of(TreeMapNonDetValue.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "java.util.Map<java.lang.String, java.lang.String> "
- + "may not be deterministically ordered"));
- }
-
- private static class TreeMapNonDetValue {
- @SuppressWarnings("unused")
- TreeMap<String, NonDeterministicArray> nonDeterministicField;
- }
-
- @Test
- public void testDeterminismUnorderedMap() {
- // LinkedHashMap is not deterministically ordered, so we should fail.
- assertNonDeterministic(AvroCoder.of(LinkedHashMapField.class),
- reasonField(LinkedHashMapField.class, "nonDeterministicMap",
- "java.util.LinkedHashMap<java.lang.String, java.lang.String> "
- + "may not be deterministically ordered"));
- }
-
- private static class LinkedHashMapField {
- @SuppressWarnings("unused")
- LinkedHashMap<String, String> nonDeterministicMap;
- }
-
- @Test
- public void testDeterminismCollection() {
- assertNonDeterministic(AvroCoder.of(StringCollection.class),
- reasonField(StringCollection.class, "stringCollection",
- "java.util.Collection<java.lang.String> may not be deterministically ordered"));
- }
-
- private static class StringCollection {
- @SuppressWarnings("unused")
- Collection<String> stringCollection;
- }
-
- @Test
- public void testDeterminismList() {
- assertDeterministic(AvroCoder.of(StringList.class));
- assertDeterministic(AvroCoder.of(StringArrayList.class));
- }
-
- private static class StringList {
- @SuppressWarnings("unused")
- List<String> stringCollection;
- }
-
- private static class StringArrayList {
- @SuppressWarnings("unused")
- ArrayList<String> stringCollection;
- }
-
- @Test
- public void testDeterminismSet() {
- assertDeterministic(AvroCoder.of(StringSortedSet.class));
- assertDeterministic(AvroCoder.of(StringTreeSet.class));
- assertNonDeterministic(AvroCoder.of(StringHashSet.class),
- reasonField(StringHashSet.class, "stringCollection",
- "java.util.HashSet<java.lang.String> may not be deterministically ordered"));
- }
-
- private static class StringSortedSet{
- @SuppressWarnings("unused")
- SortedSet<String> stringCollection;
- }
-
- private static class StringTreeSet {
- @SuppressWarnings("unused")
- TreeSet<String> stringCollection;
- }
-
- private static class StringHashSet {
- @SuppressWarnings("unused")
- HashSet<String> stringCollection;
- }
-
- @Test
- public void testDeterminismCollectionValue() {
- assertNonDeterministic(AvroCoder.of(OrderedSetOfNonDetValues.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "may not be deterministically ordered"));
- assertNonDeterministic(AvroCoder.of(ListOfNonDetValues.class),
- reasonField(UnorderedMapClass.class, "mapField",
- "may not be deterministically ordered"));
- }
-
- private static class OrderedSetOfNonDetValues {
- @SuppressWarnings("unused")
- SortedSet<UnorderedMapClass> set;
- }
-
- private static class ListOfNonDetValues {
- @SuppressWarnings("unused")
- List<UnorderedMapClass> set;
- }
-
- @Test
- public void testDeterminismUnion() {
- assertDeterministic(AvroCoder.of(DeterministicUnionBase.class));
- assertNonDeterministic(AvroCoder.of(NonDeterministicUnionBase.class),
- reasonField(UnionCase3.class, "mapField", "may not be deterministically ordered"));
- }
-
- @Test
- public void testDeterminismStringable() {
- assertDeterministic(AvroCoder.of(String.class));
- assertNonDeterministic(AvroCoder.of(StringableClass.class),
- reasonClass(StringableClass.class, "may not have deterministic #toString()"));
- }
-
- @Stringable
- private static class StringableClass {
- }
-
- @Test
- public void testDeterminismCyclicClass() {
- assertNonDeterministic(AvroCoder.of(Cyclic.class),
- reasonField(Cyclic.class, "cyclicField", "appears recursively"));
- assertNonDeterministic(AvroCoder.of(CyclicField.class),
- reasonField(Cyclic.class, "cyclicField",
- Cyclic.class.getName() + " appears recursively"));
- assertNonDeterministic(AvroCoder.of(IndirectCycle1.class),
- reasonField(IndirectCycle2.class, "field2",
- IndirectCycle1.class.getName() + " appears recursively"));
- }
-
- private static class Cyclic {
- @SuppressWarnings("unused")
- int intField;
- @SuppressWarnings("unused")
- Cyclic cyclicField;
- }
-
- private static class CyclicField {
- @SuppressWarnings("unused")
- Cyclic cyclicField2;
- }
-
- private static class IndirectCycle1 {
- @SuppressWarnings("unused")
- IndirectCycle2 field1;
- }
-
- private static class IndirectCycle2 {
- @SuppressWarnings("unused")
- IndirectCycle1 field2;
- }
-
- @Test
- public void testDeterminismHasGenericRecord() {
- assertDeterministic(AvroCoder.of(HasGenericRecord.class));
- }
-
- private static class HasGenericRecord {
- @AvroSchema("{\"name\": \"bar\", \"type\": \"record\", \"fields\": ["
- + "{\"name\": \"foo\", \"type\": \"int\"}]}")
- GenericRecord genericRecord;
- }
-
- @Test
- public void testDeterminismHasCustomSchema() {
- assertNonDeterministic(AvroCoder.of(HasCustomSchema.class),
- reasonField(HasCustomSchema.class, "withCustomSchema",
- "Custom schemas are only supported for subtypes of IndexedRecord."));
- }
-
- private static class HasCustomSchema {
- @AvroSchema("{\"name\": \"bar\", \"type\": \"record\", \"fields\": ["
- + "{\"name\": \"foo\", \"type\": \"int\"}]}")
- int withCustomSchema;
- }
-
- @Test
- public void testAvroCoderTreeMapDeterminism()
- throws Exception, NonDeterministicException {
- TreeMapField size1 = new TreeMapField();
- TreeMapField size2 = new TreeMapField();
-
- // Different order for entries
- size1.field.put("hello", "world");
- size1.field.put("another", "entry");
-
- size2.field.put("another", "entry");
- size2.field.put("hello", "world");
-
- AvroCoder<TreeMapField> coder = AvroCoder.of(TreeMapField.class);
- coder.verifyDeterministic();
-
- ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();
- ByteArrayOutputStream outStream2 = new ByteArrayOutputStream();
-
- Context context = Context.NESTED;
- coder.encode(size1, outStream1, context);
- coder.encode(size2, outStream2, context);
-
- assertTrue(Arrays.equals(
- outStream1.toByteArray(), outStream2.toByteArray()));
- }
-
- private static class TreeMapField {
- private TreeMap<String, String> field = new TreeMap<>();
- }
-
- @Union({ UnionCase1.class, UnionCase2.class })
- private abstract static class DeterministicUnionBase {}
-
- @Union({ UnionCase1.class, UnionCase2.class, UnionCase3.class })
- private abstract static class NonDeterministicUnionBase {}
- private static class UnionCase1 extends DeterministicUnionBase {}
- private static class UnionCase2 extends DeterministicUnionBase {
- @SuppressWarnings("unused")
- String field;
- }
-
- private static class UnionCase3 extends NonDeterministicUnionBase {
- @SuppressWarnings("unused")
- private Map<String, String> mapField;
- }
-
- @Test
- public void testAvroCoderSimpleSchemaDeterminism() {
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .endRecord()));
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("int").type().intType().noDefault()
- .endRecord()));
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("string").type().stringType().noDefault()
- .endRecord()));
-
- assertNonDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("map").type().map().values().stringType().noDefault()
- .endRecord()),
- reason("someRecord.map", "HashMap to represent MAPs"));
-
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("array").type().array().items().stringType().noDefault()
- .endRecord()));
-
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("enum").type().enumeration("anEnum").symbols("s1", "s2").enumDefault("s1")
- .endRecord()));
-
- assertDeterministic(AvroCoder.of(SchemaBuilder.unionOf()
- .intType().and()
- .record("someRecord").fields().nullableString("someField", "").endRecord()
- .endUnion()));
- }
-
- @Test
- public void testAvroCoderStrings() {
- // Custom Strings in Records
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("string").prop(SpecificData.CLASS_PROP, "java.lang.String")
- .type().stringType().noDefault()
- .endRecord()));
- assertNonDeterministic(AvroCoder.of(SchemaBuilder.record("someRecord").fields()
- .name("string").prop(SpecificData.CLASS_PROP, "unknownString")
- .type().stringType().noDefault()
- .endRecord()),
- reason("someRecord.string", "unknownString is not known to be deterministic"));
-
- // Custom Strings in Unions
- assertNonDeterministic(AvroCoder.of(SchemaBuilder.unionOf()
- .intType().and()
- .record("someRecord").fields()
- .name("someField").prop(SpecificData.CLASS_PROP, "unknownString")
- .type().stringType().noDefault().endRecord()
- .endUnion()),
- reason("someRecord.someField", "unknownString is not known to be deterministic"));
- }
-
- @Test
- public void testAvroCoderNestedRecords() {
- // Nested Record
- assertDeterministic(AvroCoder.of(SchemaBuilder.record("nestedRecord").fields()
- .name("subRecord").type().record("subRecord").fields()
- .name("innerField").type().stringType().noDefault()
- .endRecord().noDefault()
- .endRecord()));
- }
-
- @Test
- public void testAvroCoderCyclicRecords() {
- // Recursive record
- assertNonDeterministic(AvroCoder.of(SchemaBuilder.record("cyclicRecord").fields()
- .name("cycle").type("cyclicRecord").noDefault()
- .endRecord()),
- reason("cyclicRecord.cycle", "cyclicRecord appears recursively"));
- }
-
- private static class NullableField {
- @SuppressWarnings("unused")
- @Nullable private String nullable;
- }
-
- @Test
- public void testNullableField() {
- assertDeterministic(AvroCoder.of(NullableField.class));
- }
-
- private static class NullableNonDeterministicField {
- @SuppressWarnings("unused")
- @Nullable private NonDeterministicArray nullableNonDetArray;
- }
-
- private static class NullableCyclic {
- @SuppressWarnings("unused")
- @Nullable private NullableCyclic nullableNullableCyclicField;
- }
-
- private static class NullableCyclicField {
- @SuppressWarnings("unused")
- @Nullable private Cyclic nullableCyclicField;
- }
-
- @Test
- public void testNullableNonDeterministicField() {
- assertNonDeterministic(AvroCoder.of(NullableCyclic.class),
- reasonField(NullableCyclic.class, "nullableNullableCyclicField",
- NullableCyclic.class.getName() + " appears recursively"));
- assertNonDeterministic(AvroCoder.of(NullableCyclicField.class),
- reasonField(Cyclic.class, "cyclicField",
- Cyclic.class.getName() + " appears recursively"));
- assertNonDeterministic(AvroCoder.of(NullableNonDeterministicField.class),
- reasonField(UnorderedMapClass.class, "mapField",
- " may not be deterministically ordered"));
- }
-
- /**
- * Tests that a parameterized class can have an automatically generated schema if the generic
- * field is annotated with a union tag.
- */
- @Test
- public void testGenericClassWithUnionAnnotation() throws Exception {
- // Cast is safe as long as the same coder is used for encoding and decoding.
- @SuppressWarnings({"unchecked", "rawtypes"})
- AvroCoder<GenericWithAnnotation<String>> coder =
- (AvroCoder) AvroCoder.of(GenericWithAnnotation.class);
-
- assertThat(coder.getSchema().getField("onlySomeTypesAllowed").schema().getType(),
- equalTo(Schema.Type.UNION));
-
- CoderProperties.coderDecodeEncodeEqual(coder, new GenericWithAnnotation<>("hello"));
- }
-
- private static class GenericWithAnnotation<T> {
- @AvroSchema("[\"string\", \"int\"]")
- private T onlySomeTypesAllowed;
-
- public GenericWithAnnotation(T value) {
- onlySomeTypesAllowed = value;
- }
-
- // For deserialization only
- @SuppressWarnings("unused")
- protected GenericWithAnnotation() { }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof GenericWithAnnotation
- && onlySomeTypesAllowed.equals(((GenericWithAnnotation<?>) other).onlySomeTypesAllowed);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), onlySomeTypesAllowed);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoderTest.java
deleted file mode 100644
index d96c208..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link BigEndianIntegerCoder}.
- */
-@RunWith(JUnit4.class)
-public class BigEndianIntegerCoderTest {
-
- private static final Coder<Integer> TEST_CODER = BigEndianIntegerCoder.of();
-
- private static final List<Integer> TEST_VALUES = Arrays.asList(
- -11, -3, -1, 0, 1, 5, 13, 29,
- Integer.MAX_VALUE,
- Integer.MIN_VALUE);
-
- @Test
- public void testDecodeEncodeEqual() throws Exception {
- for (Integer value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- // This should never change. The definition of big endian encoding is fixed.
- private static final String EXPECTED_ENCODING_ID = "";
-
- @Test
- public void testEncodingId() throws Exception {
- CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link com.google.cloud.dataflow.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "____9Q",
- "_____Q",
- "_____w",
- "AAAAAA",
- "AAAAAQ",
- "AAAABQ",
- "AAAADQ",
- "AAAAHQ",
- "f____w",
- "gAAAAA");
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null Integer");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoderTest.java
deleted file mode 100644
index ea486c1..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoderTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link BigEndianLongCoder}.
- */
-@RunWith(JUnit4.class)
-public class BigEndianLongCoderTest {
-
- private static final Coder<Long> TEST_CODER = BigEndianLongCoder.of();
-
- private static final List<Long> TEST_VALUES = Arrays.asList(
- -11L, -3L, -1L, 0L, 1L, 5L, 13L, 29L,
- Integer.MAX_VALUE + 131L,
- Integer.MIN_VALUE - 29L,
- Long.MAX_VALUE,
- Long.MIN_VALUE);
-
- @Test
- public void testDecodeEncodeEqual() throws Exception {
- for (Long value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- // This should never change. The definition of big endian is fixed.
- private static final String EXPECTED_ENCODING_ID = "";
-
- @Test
- public void testEncodingId() throws Exception {
- CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link com.google.cloud.dataflow.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "__________U",
- "__________0",
- "__________8",
- "AAAAAAAAAAA",
- "AAAAAAAAAAE",
- "AAAAAAAAAAU",
- "AAAAAAAAAA0",
- "AAAAAAAAAB0",
- "AAAAAIAAAII",
- "_____3___-M",
- "f_________8",
- "gAAAAAAAAAA");
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null Long");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoderTest.java
deleted file mode 100644
index 989bc7f..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.common.CounterTestUtils;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Unit tests for {@link ByteArrayCoder}.
- */
-@RunWith(JUnit4.class)
-public class ByteArrayCoderTest {
-
- private static final ByteArrayCoder TEST_CODER = ByteArrayCoder.of();
-
- private static final List<byte[]> TEST_VALUES = Arrays.asList(
- new byte[]{0xa, 0xb, 0xc},
- new byte[]{0xd, 0x3},
- new byte[]{0xd, 0xe},
- new byte[]{});
-
- @Test
- public void testDecodeEncodeEquals() throws Exception {
- for (byte[] value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- @Test
- public void testRegisterByteSizeObserver() throws Exception {
- CounterTestUtils.testByteCount(ByteArrayCoder.of(), Coder.Context.OUTER,
- new byte[][]{{ 0xa, 0xb, 0xc }});
-
- CounterTestUtils.testByteCount(ByteArrayCoder.of(), Coder.Context.NESTED,
- new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
- }
-
- @Test
- public void testStructuralValueConsistentWithEquals() throws Exception {
- // We know that byte array coders are NOT compatible with equals
- // (aka injective w.r.t. Object.equals)
- for (byte[] value1 : TEST_VALUES) {
- for (byte[] value2 : TEST_VALUES) {
- CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
- }
- }
- }
-
- @Test
- public void testEncodeThenMutate() throws Exception {
- byte[] input = { 0x7, 0x3, 0xA, 0xf };
- byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, input);
- input[1] = 0x9;
- byte[] decoded = CoderUtils.decodeFromByteArray(TEST_CODER, encoded);
-
- // now that I have mutated the input, the output should NOT match
- assertThat(input, not(equalTo(decoded)));
- }
-
- @Test
- public void testEncodeAndOwn() throws Exception {
- for (byte[] value : TEST_VALUES) {
- byte[] encodedSlow = CoderUtils.encodeToByteArray(TEST_CODER, value);
- byte[] encodedFast = encodeToByteArrayAndOwn(TEST_CODER, value);
- assertThat(encodedSlow, equalTo(encodedFast));
- }
- }
-
- private static byte[] encodeToByteArrayAndOwn(ByteArrayCoder coder, byte[] value)
- throws IOException {
- return encodeToByteArrayAndOwn(coder, value, Coder.Context.OUTER);
- }
-
- private static byte[] encodeToByteArrayAndOwn(
- ByteArrayCoder coder, byte[] value, Coder.Context context) throws IOException {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- coder.encodeAndOwn(value, os, context);
- return os.toByteArray();
- }
-
- // If this changes, it implies the binary format has changed.
- private static final String EXPECTED_ENCODING_ID = "";
-
- @Test
- public void testEncodingId() throws Exception {
- CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link com.google.cloud.dataflow.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "CgsM",
- "DQM",
- "DQ4",
- "");
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null byte[]");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteCoderTest.java
deleted file mode 100644
index 6cb852e..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteCoderTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link ByteCoder}.
- */
-@RunWith(JUnit4.class)
-public class ByteCoderTest {
-
- private static final Coder<Byte> TEST_CODER = ByteCoder.of();
-
- private static final List<Byte> TEST_VALUES = Arrays.asList(
- (byte) 1,
- (byte) 4,
- (byte) 6,
- (byte) 50,
- (byte) 124,
- Byte.MAX_VALUE,
- Byte.MIN_VALUE);
-
- @Test
- public void testDecodeEncodeEqual() throws Exception {
- for (Byte value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- // This should never change. The format is fixed by Java.
- private static final String EXPECTED_ENCODING_ID = "";
-
- @Test
- public void testEncodingId() throws Exception {
- CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link com.google.cloud.dataflow.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "AQ",
- "BA",
- "Bg",
- "Mg",
- "fA",
- "fw",
- "gA");
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null Byte");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoderTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoderTest.java
deleted file mode 100644
index debae71..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoderTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link ByteStringCoder}.
- */
-@RunWith(JUnit4.class)
-public class ByteStringCoderTest {
-
- private static final ByteStringCoder TEST_CODER = ByteStringCoder.of();
-
- private static final List<String> TEST_STRING_VALUES = Arrays.asList(
- "", "a", "13", "hello",
- "a longer string with spaces and all that",
- "a string with a \n newline",
- "???????????????");
- private static final ImmutableList<ByteString> TEST_VALUES;
- static {
- ImmutableList.Builder<ByteString> builder = ImmutableList.<ByteString>builder();
- for (String s : TEST_STRING_VALUES) {
- builder.add(ByteString.copyFrom(s.getBytes()));
- }
- TEST_VALUES = builder.build();
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link com.google.cloud.dataflow.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "",
- "YQ",
- "MTM",
- "aGVsbG8",
- "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
- "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
- "Pz8_Pz8_Pz8_Pz8_Pz8_");
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testDecodeEncodeEqualInAllContexts() throws Exception {
- for (ByteString value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Test
- public void testCoderDeterministic() throws Throwable {
- TEST_CODER.verifyDeterministic();
- }
-
- @Test
- public void testConsistentWithEquals() {
- assertTrue(TEST_CODER.consistentWithEquals());
- }
-
- @Test
- public void testEncodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null ByteString");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-
- @Test
- public void testNestedCoding() throws Throwable {
- Coder<List<ByteString>> listCoder = ListCoder.of(TEST_CODER);
- CoderProperties.coderDecodeEncodeContentsEqual(listCoder, TEST_VALUES);
- CoderProperties.coderDecodeEncodeContentsInSameOrder(listCoder, TEST_VALUES);
- }
-
- @Test
- public void testEncodedElementByteSizeInAllContexts() throws Throwable {
- for (Context context : CoderProperties.ALL_CONTEXTS) {
- for (ByteString value : TEST_VALUES) {
- byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, context);
- assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value, context));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderFactoriesTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderFactoriesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderFactoriesTest.java
deleted file mode 100644
index 8d702bf..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderFactoriesTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-/**
- * Tests for {@link CoderFactories}.
- */
-@RunWith(JUnit4.class)
-public class CoderFactoriesTest {
-
- /**
- * Ensures that a few of our standard atomic coder classes
- * can each be built into a factory that works as expected.
- * It is presumed that testing a few, not all, suffices to
- * exercise CoderFactoryFromStaticMethods.
- */
- @Test
- public void testAtomicCoderClassFactories() {
- checkAtomicCoderFactory(StringUtf8Coder.class, StringUtf8Coder.of());
- checkAtomicCoderFactory(DoubleCoder.class, DoubleCoder.of());
- checkAtomicCoderFactory(ByteArrayCoder.class, ByteArrayCoder.of());
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link KvCoder KvCoder.class}.
- */
- @Test
- public void testKvCoderFactory() {
- CoderFactory kvCoderFactory = CoderFactories.fromStaticMethods(KvCoder.class);
- assertEquals(
- KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
- kvCoderFactory.create(Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link ListCoder ListCoder.class}.
- */
- @Test
- public void testListCoderFactory() {
- CoderFactory listCoderFactory = CoderFactories.fromStaticMethods(ListCoder.class);
-
- assertEquals(
- ListCoder.of(DoubleCoder.of()),
- listCoderFactory.create(Arrays.asList(DoubleCoder.of())));
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link IterableCoder IterableCoder.class}.
- */
- @Test
- public void testIterableCoderFactory() {
- CoderFactory iterableCoderFactory = CoderFactories.fromStaticMethods(IterableCoder.class);
-
- assertEquals(
- IterableCoder.of(DoubleCoder.of()),
- iterableCoderFactory.create(Arrays.asList(DoubleCoder.of())));
- }
-
- ///////////////////////////////////////////////////////////////////////
-
- /**
- * Checks that an atomic coder class can be converted into
- * a factory that then yields a coder equal to the example
- * provided.
- */
- private <T> void checkAtomicCoderFactory(
- Class<? extends Coder<T>> coderClazz,
- Coder<T> expectedCoder) {
- CoderFactory factory = CoderFactories.fromStaticMethods(coderClazz);
- @SuppressWarnings("unchecked")
- Coder<T> actualCoder = (Coder<T>) factory.create(Collections.<Coder<?>>emptyList());
- assertEquals(expectedCoder, actualCoder);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderProvidersTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderProvidersTest.java
deleted file mode 100644
index 1c0a89e..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderProvidersTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Map;
-
-/**
- * Tests for {@link CoderFactories}.
- */
-@RunWith(JUnit4.class)
-public class CoderProvidersTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testAvroThenSerializableStringMap() throws Exception {
- CoderProvider provider = CoderProviders.firstOf(AvroCoder.PROVIDER, SerializableCoder.PROVIDER);
- Coder<Map<String, String>> coder =
- provider.getCoder(new TypeDescriptor<Map<String, String>>(){});
- assertThat(coder, instanceOf(AvroCoder.class));
- }
-
- @Test
- public void testThrowingThenSerializable() throws Exception {
- CoderProvider provider =
- CoderProviders.firstOf(new ThrowingCoderProvider(), SerializableCoder.PROVIDER);
- Coder<Integer> coder = provider.getCoder(new TypeDescriptor<Integer>(){});
- assertThat(coder, instanceOf(SerializableCoder.class));
- }
-
- @Test
- public void testNullThrows() throws Exception {
- CoderProvider provider = CoderProviders.firstOf(new ThrowingCoderProvider());
- thrown.expect(CannotProvideCoderException.class);
- thrown.expectMessage("ThrowingCoderProvider");
- provider.getCoder(new TypeDescriptor<Integer>(){});
- }
-
- private static class ThrowingCoderProvider implements CoderProvider {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- throw new CannotProvideCoderException("ThrowingCoderProvider cannot ever provide a Coder");
- }
- }
-}