You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:47 UTC
[16/50] [abbrv] incubator-beam git commit: Add Latest CombineFn and
PTransforms
Add Latest CombineFn and PTransforms
Add DoFnTester support for specifying input timestamps
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ee7b620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ee7b620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ee7b620
Branch: refs/heads/gearpump-runner
Commit: 6ee7b620bf8e2ee07c0f30e9ff20363e79765405
Parents: 28ad44d
Author: Scott Wegner <sw...@google.com>
Authored: Thu Aug 18 13:56:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/NullableCoder.java | 7 +
.../apache/beam/sdk/transforms/DoFnTester.java | 33 ++-
.../org/apache/beam/sdk/transforms/Latest.java | 203 ++++++++++++++++
.../beam/sdk/values/TimestampedValue.java | 14 ++
.../beam/sdk/transforms/DoFnTesterTest.java | 34 ++-
.../beam/sdk/transforms/LatestFnTests.java | 233 +++++++++++++++++++
.../apache/beam/sdk/transforms/LatestTest.java | 146 ++++++++++++
.../beam/sdk/values/TimestampedValueTest.java | 83 +++++++
8 files changed, 747 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 44aadbd..9c6c7c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -65,6 +65,13 @@ public class NullableCoder<T> extends StandardCoder<T> {
this.valueCoder = valueCoder;
}
+ /**
+ * Returns the inner {@link Coder} wrapped by this {@link NullableCoder} instance.
+ */
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
+
@Override
public void encode(@Nullable T value, OutputStream outStream, Context context)
throws IOException, CoderException {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b867a55..0e018ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
@@ -221,9 +224,26 @@ public class DoFnTester<InputT, OutputT> {
* been finished
*/
public void processElement(InputT element) throws Exception {
- if (state == State.FINISHED) {
- throw new IllegalStateException("finishBundle() has already been called");
- }
+ processTimestampedElement(TimestampedValue.atMinimumTimestamp(element));
+ }
+
+ /**
+ * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
+ * context where {@link OldDoFn.ProcessContext#element} returns the
+ * given element and timestamp.
+ *
+ * <p>Will call {@link #startBundle} automatically, if it hasn't
+ * already been called.
+ *
+ * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used.
+ *
+ * @throws IllegalStateException if the {@code OldDoFn} under test has already
+ * been finished
+ */
+ public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
+ checkNotNull(element, "Timestamped element cannot be null");
+ checkState(state != State.FINISHED, "finishBundle() has already been called");
+
if (state == State.UNSTARTED) {
startBundle();
}
@@ -522,10 +542,13 @@ public class DoFnTester<InputT, OutputT> {
private TestProcessContext<InputT, OutputT> createProcessContext(
OldDoFn<InputT, OutputT> fn,
- InputT elem) {
+ TimestampedValue<InputT> elem) {
+ WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
+ elem.getValue(), elem.getTimestamp());
+
return new TestProcessContext<>(fn,
createContext(fn),
- WindowedValue.valueInGlobalWindow(elem),
+ windowedValue,
mainOutputTag,
sideInputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
new file mode 100644
index 0000000..7f13649
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * {@link PTransform} and {@link Combine.CombineFn} for computing the latest element
+ * in a {@link PCollection}.
+ *
+ * <p>Example 1: compute the latest value for each session:
+ * <pre><code>
+ * PCollection<Long> input = ...;
+ * PCollection<Long> sessioned = input
+ * .apply(Window.<Long>into(Sessions.withGapDuration(Duration.standardMinutes(5)));
+ * PCollection<Long> latestValues = sessioned.apply(Latest.<Long>globally());
+ * </code></pre>
+ *
+ * <p>Example 2: track a latest computed value in an aggregator:
+ * <pre><code>
+ * class MyDoFn extends DoFn<String, String> {
+ * private Aggregator<TimestampedValue<Double>, Double> latestValue =
+ * createAggregator("latestValue", new Latest.LatestFn<Double>());
+ *
+ * {@literal @}ProcessElement
+ * public void processElement(ProcessContext c) {
+ * double val = // ..
+ * latestValue.addValue(TimestampedValue.of(val, c.timestamp()));
+ * // ..
+ * }
+ * }
+ * </code></pre>
+ *
+ * <p>For elements with the same timestamp, the element chosen for output is arbitrary.
+ */
+public class Latest {
+ // Do not instantiate
+ private Latest() {}
+
+ /**
+ * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is
+ * particularly useful as an {@link Aggregator}.
+ *
+ * @param <T> Type of input element.
+ * @see Latest
+ */
+ public static class LatestFn<T>
+ extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>, T> {
+ /** Construct a new {@link LatestFn} instance. */
+ public LatestFn() {}
+
+ @Override
+ public TimestampedValue<T> createAccumulator() {
+ return TimestampedValue.atMinimumTimestamp(null);
+ }
+
+ @Override
+ public TimestampedValue<T> addInput(TimestampedValue<T> accumulator,
+ TimestampedValue<T> input) {
+ checkNotNull(accumulator, "accumulator must be non-null");
+ checkNotNull(input, "input must be non-null");
+
+ if (input.getTimestamp().isBefore(accumulator.getTimestamp())) {
+ return accumulator;
+ } else {
+ return input;
+ }
+ }
+
+ @Override
+ public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry,
+ Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+ return NullableCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder(CoderRegistry registry,
+ Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+ checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder,
+ "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder);
+
+ TimestampedValue.TimestampedValueCoder<T> inputTVCoder =
+ (TimestampedValue.TimestampedValueCoder<T>) inputCoder;
+ return NullableCoder.of(inputTVCoder.<T>getValueCoder());
+ }
+
+ @Override
+ public TimestampedValue<T> mergeAccumulators(Iterable<TimestampedValue<T>> accumulators) {
+ checkNotNull(accumulators, "accumulators must be non-null");
+
+ Iterator<TimestampedValue<T>> iter = accumulators.iterator();
+ if (!iter.hasNext()) {
+ return createAccumulator();
+ }
+
+ TimestampedValue<T> merged = iter.next();
+ while (iter.hasNext()) {
+ merged = addInput(merged, iter.next());
+ }
+
+ return merged;
+ }
+
+ @Override
+ public T extractOutput(TimestampedValue<T> accumulator) {
+ return accumulator.getValue();
+ }
+ }
+
+ /**
+ * Returns a {@link PTransform} that takes as input a {@link PCollection<T>} and returns a
+ * {@link PCollection<T>} whose contents is the latest element according to its event time, or
+ * {@literal null} if there are no elements.
+ *
+ * @param <T> The type of the elements being combined.
+ */
+ public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
+ return new Globally<>();
+ }
+
+ /**
+ * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
+ * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
+ * event time.
+ *
+ * @param <K> The key type of the elements being combined.
+ * @param <V> The value type of the elements being combined.
+ */
+ public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
+ return new PerKey<>();
+ }
+
+ private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ Coder<T> inputCoder = input.getCoder();
+
+ return input
+ .apply("Reify Timestamps", ParDo.of(
+ new DoFn<T, TimestampedValue<T>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(TimestampedValue.of(c.element(), c.timestamp()));
+ }
+ })).setCoder(TimestampedValue.TimestampedValueCoder.of(inputCoder))
+ .apply("Latest Value", Combine.globally(new LatestFn<T>()))
+ .setCoder(NullableCoder.of(inputCoder));
+ }
+ }
+
+ private static class PerKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+ @Override
+ public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+ checkNotNull(input);
+ checkArgument(input.getCoder() instanceof KvCoder,
+ "Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder());
+
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ return input
+ .apply("Reify Timestamps", ParDo.of(
+ new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().getKey(), TimestampedValue.of(c.element().getValue(),
+ c.timestamp())));
+ }
+ })).setCoder(KvCoder.of(
+ inputCoder.getKeyCoder(),
+ TimestampedValue.TimestampedValueCoder.of(inputCoder.getValueCoder())))
+ .apply("Latest Value", Combine.<K, TimestampedValue<V>, V>perKey(new LatestFn<V>()))
+ .setCoder(inputCoder);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index f2ad616..dd80fb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.PropertyNames;
import org.joda.time.Instant;
@@ -43,6 +44,13 @@ import org.joda.time.Instant;
* @param <V> the type of the value
*/
public class TimestampedValue<V> {
+ /**
+ * Returns a new {@link TimestampedValue} with the
+ * {@link BoundedWindow#TIMESTAMP_MIN_VALUE minimum timestamp}.
+ */
+ public static <V> TimestampedValue<V> atMinimumTimestamp(V value) {
+ return of(value, BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
/**
* Returns a new {@code TimestampedValue} with the given value and timestamp.
@@ -136,6 +144,10 @@ public class TimestampedValue<V> {
return Arrays.<Coder<?>>asList(valueCoder);
}
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
+
public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
return Arrays.<Object>asList(exampleValue.getValue());
}
@@ -147,6 +159,8 @@ public class TimestampedValue<V> {
private final Instant timestamp;
protected TimestampedValue(V value, Instant timestamp) {
+ checkNotNull(timestamp, "timestamp must be non-null");
+
this.value = value;
this.timestamp = timestamp;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 2649be5..3ed30fd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
@@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -44,6 +47,7 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class DoFnTesterTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void processElement() throws Exception {
@@ -126,6 +130,16 @@ public class DoFnTesterTest {
}
@Test
+ public void processElementAfterFinish() throws Exception {
+ DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn());
+ tester.finishBundle();
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("finishBundle() has already been called");
+ tester.processElement(1L);
+ }
+
+ @Test
public void processBatch() throws Exception {
CounterDoFn counterDoFn = new CounterDoFn();
DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
@@ -145,7 +159,25 @@ public class DoFnTesterTest {
}
@Test
- public void processElementWithTimestamp() throws Exception {
+ public void processTimestampedElement() throws Exception {
+ DoFn<Long, TimestampedValue<Long>> reifyTimestamps = new ReifyTimestamps();
+
+ DoFnTester<Long, TimestampedValue<Long>> tester = DoFnTester.of(reifyTimestamps);
+
+ TimestampedValue<Long> input = TimestampedValue.of(1L, new Instant(100));
+ tester.processTimestampedElement(input);
+ assertThat(tester.takeOutputElements(), contains(input));
+ }
+
+ static class ReifyTimestamps extends DoFn<Long, TimestampedValue<Long>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(TimestampedValue.of(c.element(), c.timestamp()));
+ }
+ }
+
+ @Test
+ public void processElementWithOutputTimestamp() throws Exception {
CounterDoFn counterDoFn = new CounterDoFn();
DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
new file mode 100644
index 0000000..84b5b68
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest.LatestFn}.
+ * */
+@RunWith(JUnit4.class)
+public class LatestFnTests {
+ private static final Instant INSTANT = new Instant(100);
+ private static final long VALUE = 100 * INSTANT.getMillis();
+
+ private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
+ private static final TimestampedValue<Long> TV_MINUS_TEN =
+ TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
+ private static final TimestampedValue<Long> TV_PLUS_TEN =
+ TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+ private final Instant baseTimestamp = Instant.now();
+
+ @Test
+ public void testDefaultValue() {
+ assertThat(fn.defaultValue(), nullValue());
+ }
+
+ @Test
+ public void testCreateAccumulator() {
+ assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
+ }
+
+ @Test
+ public void testAddInputInitialAdd() {
+ TimestampedValue<Long> input = TV;
+ assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+ }
+
+ @Test
+ public void testAddInputMinTimestamp() {
+ TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
+ assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+ }
+
+ @Test
+ public void testAddInputEarlierValue() {
+ assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
+ }
+
+ @Test
+ public void testAddInputLaterValue() {
+ assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
+ }
+
+ @Test
+ public void testAddInputSameTimestamp() {
+ TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
+ TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
+
+ assertThat("Latest for values with the same timestamp is chosen arbitrarily",
+ fn.addInput(accum, input), isOneOf(accum, input));
+ }
+
+ @Test
+ public void testAddInputNullAccumulator() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("accumulators");
+ fn.addInput(null, TV);
+ }
+
+ @Test
+ public void testAddInputNullInput() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("input");
+ fn.addInput(TV, null);
+ }
+
+ @Test
+ public void testAddInputNullValue() {
+ TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
+ assertEquals("Null values are allowed", input, fn.addInput(TV, input));
+ }
+
+ @Test
+ public void testMergeAccumulatorsMultipleValues() {
+ Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
+ TV,
+ TV_PLUS_TEN,
+ TV_MINUS_TEN
+ );
+
+ assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
+ }
+
+ @Test
+ public void testMergeAccumulatorsSingleValue() {
+ assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsEmptyIterable() {
+ ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
+ assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
+ }
+
+ @Test
+ public void testMergeAccumulatorsDefaultAccumulator() {
+ TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+ assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsAllDefaultAccumulators() {
+ TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+ assertEquals(defaultAccum, fn.mergeAccumulators(
+ Lists.newArrayList(defaultAccum, defaultAccum)));
+ }
+
+ @Test
+ public void testMergeAccumulatorsNullIterable() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("accumulators");
+ fn.mergeAccumulators(null);
+ }
+
+ @Test
+ public void testExtractOutput() {
+ assertEquals(TV.getValue(), fn.extractOutput(TV));
+ }
+
+ @Test
+ public void testExtractOutputDefaultAggregator() {
+ TimestampedValue<Long> accum = fn.createAccumulator();
+ assertThat(fn.extractOutput(accum), nullValue());
+ }
+
+ @Test
+ public void testExtractOutputNullValue() {
+ TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
+ assertEquals(null, fn.extractOutput(accum));
+ }
+
+ @Test
+ public void testAggregator() throws Exception {
+ LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
+ DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
+ for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) {
+ harness.processTimestampedElement(element);
+ }
+
+ assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
+ assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
+ assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
+ }
+
+ @Test
+ public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
+ Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+
+ CoderRegistry registry = new CoderRegistry();
+ TimestampedValue.TimestampedValueCoder<Long> inputCoder =
+ TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
+
+ assertThat("Default output coder should handle null values",
+ fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+ assertThat("Default accumulator coder should handle null values",
+ fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+ }
+
+ static class LatestAggregatorsFn<T> extends DoFn<T, T> {
+ private final T specialValue;
+ LatestAggregatorsFn(T specialValue) {
+ this.specialValue = specialValue;
+ }
+
+ Aggregator<TimestampedValue<T>, T> allValuesAgg =
+ createAggregator("allValues", new Latest.LatestFn<T>());
+
+ Aggregator<TimestampedValue<T>, T> specialValueAgg =
+ createAggregator("oneValue", new Latest.LatestFn<T>());
+
+ Aggregator<TimestampedValue<T>, T> noValuesAgg =
+ createAggregator("noValues", new Latest.LatestFn<T>());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
+ allValuesAgg.addValue(val);
+ if (Objects.equals(c.element(), specialValue)) {
+ specialValueAgg.addValue(val);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
new file mode 100644
index 0000000..ce9ae37
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest} {@link PTransform} and {@link Combine.CombineFn}.
+ */
+@RunWith(JUnit4.class)
+public class LatestTest implements Serializable {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGloballyEventTimestamp() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<String> output =
+ p.apply(Create.timestamped(
+ TimestampedValue.of("foo", new Instant(100)),
+ TimestampedValue.of("bar", new Instant(300)),
+ TimestampedValue.of("baz", new Instant(200))
+ ))
+ .apply(Latest.<String>globally());
+
+ PAssert.that(output).containsInAnyOrder("bar");
+ p.run();
+ }
+
+ @Test
+ public void testGloballyOutputCoder() {
+ TestPipeline p = TestPipeline.create();
+ BigEndianLongCoder inputCoder = BigEndianLongCoder.of();
+
+ PCollection<Long> output =
+ p.apply(Create.of(1L, 2L).withCoder(inputCoder))
+ .apply(Latest.<Long>globally());
+
+ Coder<Long> outputCoder = output.getCoder();
+ assertThat(outputCoder, instanceOf(NullableCoder.class));
+ assertEquals(inputCoder, ((NullableCoder<?>) outputCoder).getValueCoder());
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGloballyEmptyCollection() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> emptyInput = p.apply(Create.<Long>of()
+ // Explicitly set coder such that then runner enforces encodability.
+ .withCoder(VarLongCoder.of()));
+ PCollection<Long> output = emptyInput.apply(Latest.<Long>globally());
+
+ PAssert.that(output).containsInAnyOrder((Long) null);
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testPerKeyEventTimestamp() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<KV<String, String>> output =
+ p.apply(Create.timestamped(
+ TimestampedValue.of(KV.of("A", "foo"), new Instant(100)),
+ TimestampedValue.of(KV.of("B", "bar"), new Instant(300)),
+ TimestampedValue.of(KV.of("A", "baz"), new Instant(200))
+ ))
+ .apply(Latest.<String, String>perKey());
+
+ PAssert.that(output).containsInAnyOrder(KV.of("B", "bar"), KV.of("A", "baz"));
+ p.run();
+ }
+
+ @Test
+ public void testPerKeyOutputCoder() {
+ TestPipeline p = TestPipeline.create();
+ KvCoder<String, Long> inputCoder = KvCoder.of(
+ AvroCoder.of(String.class), AvroCoder.of(Long.class));
+
+ PCollection<KV<String, Long>> output =
+ p.apply(Create.of(KV.of("foo", 1L)).withCoder(inputCoder))
+ .apply(Latest.<String, Long>perKey());
+
+ assertEquals("Should use input coder for outputs", inputCoder, output.getCoder());
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testPerKeyEmptyCollection() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<KV<String, String>> output =
+ p.apply(Create.<KV<String, String>>of().withCoder(KvCoder.of(
+ StringUtf8Coder.of(), StringUtf8Coder.of())))
+ .apply(Latest.<String, String>perKey());
+
+ PAssert.that(output).empty();
+ p.run();
+ }
+
+ /** Helper method to easily create a timestamped value. */
+ private static TimestampedValue<Long> timestamped(Instant timestamp) {
+ return TimestampedValue.of(uniqueLong.incrementAndGet(), timestamp);
+ }
+ private static final AtomicLong uniqueLong = new AtomicLong();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
new file mode 100644
index 0000000..a982f31
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.testing.EqualsTester;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link TimestampedValue}.
+ */
+@RunWith(JUnit4.class)
+public class TimestampedValueTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testValues() {
+ Instant now = Instant.now();
+ TimestampedValue<String> tsv = TimestampedValue.of("foobar", now);
+
+ assertEquals(now, tsv.getTimestamp());
+ assertEquals("foobar", tsv.getValue());
+ }
+
+ @Test
+ public void testAtMinimumTimestamp() {
+ TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp("foobar");
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tsv.getTimestamp());
+ }
+
+ @Test
+ public void testNullTimestamp() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("timestamp");
+ TimestampedValue.of("foobar", null);
+ }
+
+ @Test
+ public void testNullValue() {
+ TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp(null);
+ assertEquals(null, tsv.getValue());
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester()
+ .addEqualityGroup(
+ TimestampedValue.of("foo", new Instant(1000)),
+ TimestampedValue.of("foo", new Instant(1000)))
+ .addEqualityGroup(TimestampedValue.of("foo", new Instant(2000)))
+ .addEqualityGroup(TimestampedValue.of("bar", new Instant(1000)))
+ .addEqualityGroup(
+ TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
+ TimestampedValue.atMinimumTimestamp("foo"))
+ .testEquals();
+ }
+}