You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 16:57:24 UTC
[2/3] incubator-beam git commit: Move LatestFnTests to LatestFnTest
Move LatestFnTests to LatestFnTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52e43ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52e43ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52e43ac7
Branch: refs/heads/master
Commit: 52e43ac7b8257ecbcda61eb3b14406c36df08a3b
Parents: 60a8aef
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 4 13:23:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 09:49:53 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/LatestFnTest.java | 233 +++++++++++++++++++
.../beam/sdk/transforms/LatestFnTests.java | 233 -------------------
2 files changed, 233 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
new file mode 100644
index 0000000..31acb08
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest.LatestFn}.
+ * */
+@RunWith(JUnit4.class)
+public class LatestFnTest {
+ private static final Instant INSTANT = new Instant(100);
+ private static final long VALUE = 100 * INSTANT.getMillis();
+
+ private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
+ private static final TimestampedValue<Long> TV_MINUS_TEN =
+ TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
+ private static final TimestampedValue<Long> TV_PLUS_TEN =
+ TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+ private final Instant baseTimestamp = Instant.now();
+
+ @Test
+ public void testDefaultValue() {
+ assertThat(fn.defaultValue(), nullValue());
+ }
+
+ @Test
+ public void testCreateAccumulator() {
+ assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
+ }
+
+ @Test
+ public void testAddInputInitialAdd() {
+ TimestampedValue<Long> input = TV;
+ assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+ }
+
+ @Test
+ public void testAddInputMinTimestamp() {
+ TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
+ assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+ }
+
+ @Test
+ public void testAddInputEarlierValue() {
+ assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
+ }
+
+ @Test
+ public void testAddInputLaterValue() {
+ assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
+ }
+
+ @Test
+ public void testAddInputSameTimestamp() {
+ TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
+ TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
+
+ assertThat("Latest for values with the same timestamp is chosen arbitrarily",
+ fn.addInput(accum, input), isOneOf(accum, input));
+ }
+
+ @Test
+ public void testAddInputNullAccumulator() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("accumulator");
+ fn.addInput(null, TV);
+ }
+
+ @Test
+ public void testAddInputNullInput() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("input");
+ fn.addInput(TV, null);
+ }
+
+ @Test
+ public void testAddInputNullValue() {
+ TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
+ assertEquals("Null values are allowed", input, fn.addInput(TV, input));
+ }
+
+ @Test
+ public void testMergeAccumulatorsMultipleValues() {
+ Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
+ TV,
+ TV_PLUS_TEN,
+ TV_MINUS_TEN
+ );
+
+ assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
+ }
+
+ @Test
+ public void testMergeAccumulatorsSingleValue() {
+ assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsEmptyIterable() {
+ ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
+ assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
+ }
+
+ @Test
+ public void testMergeAccumulatorsDefaultAccumulator() {
+ TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+ assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsAllDefaultAccumulators() {
+ TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+ assertEquals(defaultAccum, fn.mergeAccumulators(
+ Lists.newArrayList(defaultAccum, defaultAccum)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsNullIterable() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("accumulators");
+ fn.mergeAccumulators(null);
+ }
+
+ @Test
+ public void testExtractOutput() {
+ assertEquals(TV.getValue(), fn.extractOutput(TV));
+ }
+
+ @Test
+ public void testExtractOutputDefaultAggregator() {
+ TimestampedValue<Long> accum = fn.createAccumulator();
+ assertThat(fn.extractOutput(accum), nullValue());
+ }
+
+ @Test
+ public void testExtractOutputNullValue() {
+ TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
+ assertEquals(null, fn.extractOutput(accum));
+ }
+
+ @Test
+ public void testAggregator() throws Exception {
+ LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
+ DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
+ for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) {
+ harness.processTimestampedElement(element);
+ }
+
+ assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
+ assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
+ assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
+ }
+
+ @Test
+ public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
+ Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+
+ CoderRegistry registry = new CoderRegistry();
+ TimestampedValue.TimestampedValueCoder<Long> inputCoder =
+ TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
+
+ assertThat("Default output coder should handle null values",
+ fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+ assertThat("Default accumulator coder should handle null values",
+ fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+ }
+
+ static class LatestAggregatorsFn<T> extends DoFn<T, T> {
+ private final T specialValue;
+ LatestAggregatorsFn(T specialValue) {
+ this.specialValue = specialValue;
+ }
+
+ Aggregator<TimestampedValue<T>, T> allValuesAgg =
+ createAggregator("allValues", new Latest.LatestFn<T>());
+
+ Aggregator<TimestampedValue<T>, T> specialValueAgg =
+ createAggregator("oneValue", new Latest.LatestFn<T>());
+
+ Aggregator<TimestampedValue<T>, T> noValuesAgg =
+ createAggregator("noValues", new Latest.LatestFn<T>());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
+ allValuesAgg.addValue(val);
+ if (Objects.equals(c.element(), specialValue)) {
+ specialValueAgg.addValue(val);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
deleted file mode 100644
index 459a966..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.isOneOf;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link Latest.LatestFn}.
- * */
-@RunWith(JUnit4.class)
-public class LatestFnTests {
- private static final Instant INSTANT = new Instant(100);
- private static final long VALUE = 100 * INSTANT.getMillis();
-
- private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
- private static final TimestampedValue<Long> TV_MINUS_TEN =
- TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
- private static final TimestampedValue<Long> TV_PLUS_TEN =
- TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
- private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
- private final Instant baseTimestamp = Instant.now();
-
- @Test
- public void testDefaultValue() {
- assertThat(fn.defaultValue(), nullValue());
- }
-
- @Test
- public void testCreateAccumulator() {
- assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
- }
-
- @Test
- public void testAddInputInitialAdd() {
- TimestampedValue<Long> input = TV;
- assertEquals(input, fn.addInput(fn.createAccumulator(), input));
- }
-
- @Test
- public void testAddInputMinTimestamp() {
- TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
- assertEquals(input, fn.addInput(fn.createAccumulator(), input));
- }
-
- @Test
- public void testAddInputEarlierValue() {
- assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
- }
-
- @Test
- public void testAddInputLaterValue() {
- assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
- }
-
- @Test
- public void testAddInputSameTimestamp() {
- TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
- TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
-
- assertThat("Latest for values with the same timestamp is chosen arbitrarily",
- fn.addInput(accum, input), isOneOf(accum, input));
- }
-
- @Test
- public void testAddInputNullAccumulator() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("accumulator");
- fn.addInput(null, TV);
- }
-
- @Test
- public void testAddInputNullInput() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("input");
- fn.addInput(TV, null);
- }
-
- @Test
- public void testAddInputNullValue() {
- TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
- assertEquals("Null values are allowed", input, fn.addInput(TV, input));
- }
-
- @Test
- public void testMergeAccumulatorsMultipleValues() {
- Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
- TV,
- TV_PLUS_TEN,
- TV_MINUS_TEN
- );
-
- assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
- }
-
- @Test
- public void testMergeAccumulatorsSingleValue() {
- assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
- }
-
- @Test
- public void testMergeAccumulatorsEmptyIterable() {
- ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
- assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
- }
-
- @Test
- public void testMergeAccumulatorsDefaultAccumulator() {
- TimestampedValue<Long> defaultAccum = fn.createAccumulator();
- assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
- }
-
- @Test
- public void testMergeAccumulatorsAllDefaultAccumulators() {
- TimestampedValue<Long> defaultAccum = fn.createAccumulator();
- assertEquals(defaultAccum, fn.mergeAccumulators(
- Lists.newArrayList(defaultAccum, defaultAccum)));
- }
-
- @Test
- public void testMergeAccumulatorsNullIterable() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("accumulators");
- fn.mergeAccumulators(null);
- }
-
- @Test
- public void testExtractOutput() {
- assertEquals(TV.getValue(), fn.extractOutput(TV));
- }
-
- @Test
- public void testExtractOutputDefaultAggregator() {
- TimestampedValue<Long> accum = fn.createAccumulator();
- assertThat(fn.extractOutput(accum), nullValue());
- }
-
- @Test
- public void testExtractOutputNullValue() {
- TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
- assertEquals(null, fn.extractOutput(accum));
- }
-
- @Test
- public void testAggregator() throws Exception {
- LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
- DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
- for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) {
- harness.processTimestampedElement(element);
- }
-
- assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
- assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
- assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
- }
-
- @Test
- public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
- Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
-
- CoderRegistry registry = new CoderRegistry();
- TimestampedValue.TimestampedValueCoder<Long> inputCoder =
- TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
-
- assertThat("Default output coder should handle null values",
- fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
- assertThat("Default accumulator coder should handle null values",
- fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
- }
-
- static class LatestAggregatorsFn<T> extends DoFn<T, T> {
- private final T specialValue;
- LatestAggregatorsFn(T specialValue) {
- this.specialValue = specialValue;
- }
-
- Aggregator<TimestampedValue<T>, T> allValuesAgg =
- createAggregator("allValues", new Latest.LatestFn<T>());
-
- Aggregator<TimestampedValue<T>, T> specialValueAgg =
- createAggregator("oneValue", new Latest.LatestFn<T>());
-
- Aggregator<TimestampedValue<T>, T> noValuesAgg =
- createAggregator("noValues", new Latest.LatestFn<T>());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
- allValuesAgg.addValue(val);
- if (Objects.equals(c.element(), specialValue)) {
- specialValueAgg.addValue(val);
- }
- }
- }
-}