You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:47 UTC

[16/50] [abbrv] incubator-beam git commit: Add Latest CombineFn and PTransforms

Add Latest CombineFn and PTransforms

Add DoFnTester support for specifying input timestamps


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ee7b620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ee7b620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ee7b620

Branch: refs/heads/gearpump-runner
Commit: 6ee7b620bf8e2ee07c0f30e9ff20363e79765405
Parents: 28ad44d
Author: Scott Wegner <sw...@google.com>
Authored: Thu Aug 18 13:56:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/NullableCoder.java   |   7 +
 .../apache/beam/sdk/transforms/DoFnTester.java  |  33 ++-
 .../org/apache/beam/sdk/transforms/Latest.java  | 203 ++++++++++++++++
 .../beam/sdk/values/TimestampedValue.java       |  14 ++
 .../beam/sdk/transforms/DoFnTesterTest.java     |  34 ++-
 .../beam/sdk/transforms/LatestFnTests.java      | 233 +++++++++++++++++++
 .../apache/beam/sdk/transforms/LatestTest.java  | 146 ++++++++++++
 .../beam/sdk/values/TimestampedValueTest.java   |  83 +++++++
 8 files changed, 747 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 44aadbd..9c6c7c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -65,6 +65,13 @@ public class NullableCoder<T> extends StandardCoder<T> {
     this.valueCoder = valueCoder;
   }
 
+  /**
+   * Returns the inner {@link Coder} wrapped by this {@link NullableCoder} instance.
+   */
+  public Coder<T> getValueCoder() {
+    return valueCoder;
+  }
+
   @Override
   public void encode(@Nullable T value, OutputStream outStream, Context context)
       throws IOException, CoderException  {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b867a55..0e018ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
@@ -221,9 +224,26 @@ public class DoFnTester<InputT, OutputT> {
    * been finished
    */
   public void processElement(InputT element) throws Exception {
-    if (state == State.FINISHED) {
-      throw new IllegalStateException("finishBundle() has already been called");
-    }
+    processTimestampedElement(TimestampedValue.atMinimumTimestamp(element));
+  }
+
+  /**
+   * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
+   * context where {@link OldDoFn.ProcessContext#element} returns the
+   * given element and timestamp.
+   *
+   * <p>Will call {@link #startBundle} automatically, if it hasn't
+   * already been called.
+   *
+   * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used.
+   *
+   * @throws IllegalStateException if the {@code OldDoFn} under test has already
+   * been finished
+   */
+  public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
+    checkNotNull(element, "Timestamped element cannot be null");
+    checkState(state != State.FINISHED, "finishBundle() has already been called");
+
     if (state == State.UNSTARTED) {
       startBundle();
     }
@@ -522,10 +542,13 @@ public class DoFnTester<InputT, OutputT> {
 
   private TestProcessContext<InputT, OutputT> createProcessContext(
       OldDoFn<InputT, OutputT> fn,
-      InputT elem) {
+      TimestampedValue<InputT> elem) {
+    WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
+        elem.getValue(), elem.getTimestamp());
+
     return new TestProcessContext<>(fn,
         createContext(fn),
-        WindowedValue.valueInGlobalWindow(elem),
+        windowedValue,
         mainOutputTag,
         sideInputs);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
new file mode 100644
index 0000000..7f13649
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * {@link PTransform} and {@link Combine.CombineFn} for computing the latest element
+ * in a {@link PCollection}.
+ *
+ * <p>Example 1: compute the latest value for each session:
+ * <pre><code>
+ * PCollection<Long> input = ...;
+ * PCollection<Long> sessioned = input
+ *    .apply(Window.<Long>into(Sessions.withGapDuration(Duration.standardMinutes(5)));
+ * PCollection<Long> latestValues = sessioned.apply(Latest.<Long>globally());
+ * </code></pre>
+ *
+ * <p>Example 2: track a latest computed value in an aggregator:
+ * <pre><code>
+ * class MyDoFn extends DoFn<String, String> {
+ *  private Aggregator<TimestampedValue<Double>, Double> latestValue =
+ *    createAggregator("latestValue", new Latest.LatestFn<Double>());
+ *
+ *  {@literal @}ProcessElement
+ *  public void processElement(ProcessContext c) {
+ *    double val = // ..
+ *    latestValue.addValue(TimestampedValue.of(val, c.timestamp()));
+ *    // ..
+ *  }
+ * }
+ * </code></pre>
+ *
+ * <p>For elements with the same timestamp, the element chosen for output is arbitrary.
+ */
+public class Latest {
+  // Do not instantiate
+  private Latest() {}
+
+  /**
+   * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is
+   * particularly useful as an {@link Aggregator}.
+   *
+   * @param <T> Type of input element.
+   * @see Latest
+   */
+  public static class LatestFn<T>
+      extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>, T> {
+    /** Construct a new {@link LatestFn} instance. */
+    public LatestFn() {}
+
+    @Override
+    public TimestampedValue<T> createAccumulator() {
+      return TimestampedValue.atMinimumTimestamp(null);
+    }
+
+    @Override
+    public TimestampedValue<T> addInput(TimestampedValue<T> accumulator,
+        TimestampedValue<T> input) {
+      checkNotNull(accumulator, "accumulator must be non-null");
+      checkNotNull(input, "input must be non-null");
+
+      if (input.getTimestamp().isBefore(accumulator.getTimestamp())) {
+        return accumulator;
+      } else {
+        return input;
+      }
+    }
+
+    @Override
+    public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry,
+        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+      return NullableCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder(CoderRegistry registry,
+        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+      checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder,
+          "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder);
+
+      TimestampedValue.TimestampedValueCoder<T> inputTVCoder =
+          (TimestampedValue.TimestampedValueCoder<T>) inputCoder;
+      return NullableCoder.of(inputTVCoder.<T>getValueCoder());
+    }
+
+    @Override
+    public TimestampedValue<T> mergeAccumulators(Iterable<TimestampedValue<T>> accumulators) {
+      checkNotNull(accumulators, "accumulators must be non-null");
+
+      Iterator<TimestampedValue<T>> iter = accumulators.iterator();
+      if (!iter.hasNext()) {
+        return createAccumulator();
+      }
+
+      TimestampedValue<T> merged = iter.next();
+      while (iter.hasNext()) {
+        merged = addInput(merged, iter.next());
+      }
+
+      return merged;
+    }
+
+    @Override
+    public T extractOutput(TimestampedValue<T> accumulator) {
+      return accumulator.getValue();
+    }
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@link PCollection<T>} and returns a
+   * {@link PCollection<T>} whose contents is the latest element according to its event time, or
+   * {@literal null} if there are no elements.
+   *
+   * @param <T> The type of the elements being combined.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
+    return new Globally<>();
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
+   * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
+   * event time.
+   *
+   * @param <K> The key type of the elements being combined.
+   * @param <V> The value type of the elements being combined.
+   */
+  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
+    return new PerKey<>();
+  }
+
+  private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      Coder<T> inputCoder = input.getCoder();
+
+      return input
+          .apply("Reify Timestamps", ParDo.of(
+            new DoFn<T, TimestampedValue<T>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                c.output(TimestampedValue.of(c.element(), c.timestamp()));
+              }
+            })).setCoder(TimestampedValue.TimestampedValueCoder.of(inputCoder))
+          .apply("Latest Value", Combine.globally(new LatestFn<T>()))
+            .setCoder(NullableCoder.of(inputCoder));
+    }
+  }
+
+  private static class PerKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) {
+      checkNotNull(input);
+      checkArgument(input.getCoder() instanceof KvCoder,
+          "Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder());
+
+      @SuppressWarnings("unchecked")
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      return input
+          .apply("Reify Timestamps", ParDo.of(
+            new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                c.output(KV.of(c.element().getKey(), TimestampedValue.of(c.element().getValue(),
+                    c.timestamp())));
+              }
+            })).setCoder(KvCoder.of(
+                inputCoder.getKeyCoder(),
+                TimestampedValue.TimestampedValueCoder.of(inputCoder.getValueCoder())))
+          .apply("Latest Value", Combine.<K, TimestampedValue<V>, V>perKey(new LatestFn<V>()))
+            .setCoder(inputCoder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index f2ad616..dd80fb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.joda.time.Instant;
 
@@ -43,6 +44,13 @@ import org.joda.time.Instant;
  * @param <V> the type of the value
  */
 public class TimestampedValue<V> {
+  /**
+   * Returns a new {@link TimestampedValue} with the
+   * {@link BoundedWindow#TIMESTAMP_MIN_VALUE minimum timestamp}.
+   */
+  public static <V> TimestampedValue<V> atMinimumTimestamp(V value) {
+    return of(value, BoundedWindow.TIMESTAMP_MIN_VALUE);
+  }
 
   /**
    * Returns a new {@code TimestampedValue} with the given value and timestamp.
@@ -136,6 +144,10 @@ public class TimestampedValue<V> {
       return Arrays.<Coder<?>>asList(valueCoder);
     }
 
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
     public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
       return Arrays.<Object>asList(exampleValue.getValue());
     }
@@ -147,6 +159,8 @@ public class TimestampedValue<V> {
   private final Instant timestamp;
 
   protected TimestampedValue(V value, Instant timestamp) {
+    checkNotNull(timestamp, "timestamp must be non-null");
+
     this.value = value;
     this.timestamp = timestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 2649be5..3ed30fd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
@@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -44,6 +47,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class DoFnTesterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void processElement() throws Exception {
@@ -126,6 +130,16 @@ public class DoFnTesterTest {
   }
 
   @Test
+  public void processElementAfterFinish() throws Exception {
+    DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn());
+    tester.finishBundle();
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("finishBundle() has already been called");
+    tester.processElement(1L);
+  }
+
+  @Test
   public void processBatch() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
@@ -145,7 +159,25 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void processElementWithTimestamp() throws Exception {
+  public void processTimestampedElement() throws Exception {
+    DoFn<Long, TimestampedValue<Long>> reifyTimestamps = new ReifyTimestamps();
+
+    DoFnTester<Long, TimestampedValue<Long>> tester = DoFnTester.of(reifyTimestamps);
+
+    TimestampedValue<Long> input = TimestampedValue.of(1L, new Instant(100));
+    tester.processTimestampedElement(input);
+    assertThat(tester.takeOutputElements(), contains(input));
+  }
+
+  static class ReifyTimestamps extends DoFn<Long, TimestampedValue<Long>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(TimestampedValue.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  public void processElementWithOutputTimestamp() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
new file mode 100644
index 0000000..84b5b68
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest.LatestFn}.
+ * */
+@RunWith(JUnit4.class)
+public class LatestFnTests {
+  private static final Instant INSTANT = new Instant(100);
+  private static final long VALUE = 100 * INSTANT.getMillis();
+
+  private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
+  private static final TimestampedValue<Long> TV_MINUS_TEN =
+      TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
+  private static final TimestampedValue<Long> TV_PLUS_TEN =
+      TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+  private final Instant baseTimestamp = Instant.now();
+
+  @Test
+  public void testDefaultValue() {
+    assertThat(fn.defaultValue(), nullValue());
+  }
+
+  @Test
+  public void testCreateAccumulator() {
+    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
+  }
+
+  @Test
+  public void testAddInputInitialAdd() {
+    TimestampedValue<Long> input = TV;
+    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+  }
+
+  @Test
+  public void testAddInputMinTimestamp() {
+    TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
+    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+  }
+
+  @Test
+  public void testAddInputEarlierValue() {
+    assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
+  }
+
+  @Test
+  public void testAddInputLaterValue() {
+    assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
+  }
+
+  @Test
+  public void testAddInputSameTimestamp() {
+    TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
+    TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
+
+    assertThat("Latest for values with the same timestamp is chosen arbitrarily",
+        fn.addInput(accum, input), isOneOf(accum, input));
+  }
+
+  @Test
+  public void testAddInputNullAccumulator() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("accumulators");
+    fn.addInput(null, TV);
+  }
+
+  @Test
+  public void testAddInputNullInput() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("input");
+    fn.addInput(TV, null);
+  }
+
+  @Test
+  public void testAddInputNullValue() {
+    TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
+    assertEquals("Null values are allowed", input, fn.addInput(TV, input));
+  }
+
+  @Test
+  public void testMergeAccumulatorsMultipleValues() {
+    Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
+        TV,
+        TV_PLUS_TEN,
+        TV_MINUS_TEN
+    );
+
+    assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
+  }
+
+  @Test
+  public void testMergeAccumulatorsSingleValue() {
+    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsEmptyIterable() {
+    ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
+    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
+  }
+
+  @Test
+  public void testMergeAccumulatorsDefaultAccumulator() {
+    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsAllDefaultAccumulators() {
+    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+    assertEquals(defaultAccum, fn.mergeAccumulators(
+        Lists.newArrayList(defaultAccum, defaultAccum)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsNullIterable() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("accumulators");
+    fn.mergeAccumulators(null);
+  }
+
+  @Test
+  public void testExtractOutput() {
+    assertEquals(TV.getValue(), fn.extractOutput(TV));
+  }
+
+  @Test
+  public void testExtractOutputDefaultAggregator() {
+    TimestampedValue<Long> accum = fn.createAccumulator();
+    assertThat(fn.extractOutput(accum), nullValue());
+  }
+
+  @Test
+  public void testExtractOutputNullValue() {
+    TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
+    assertEquals(null, fn.extractOutput(accum));
+  }
+
+  @Test
+  public void testAggregator() throws Exception {
+    LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
+    DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
+    for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) {
+      harness.processTimestampedElement(element);
+    }
+
+    assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
+    assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
+    assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
+  }
+
+  @Test
+  public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
+    Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+
+    CoderRegistry registry = new CoderRegistry();
+    TimestampedValue.TimestampedValueCoder<Long> inputCoder =
+        TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
+
+    assertThat("Default output coder should handle null values",
+        fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+    assertThat("Default accumulator coder should handle null values",
+        fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+  }
+
+  static class LatestAggregatorsFn<T> extends DoFn<T, T> {
+    private final T specialValue;
+    LatestAggregatorsFn(T specialValue) {
+      this.specialValue = specialValue;
+    }
+
+    Aggregator<TimestampedValue<T>, T> allValuesAgg =
+        createAggregator("allValues", new Latest.LatestFn<T>());
+
+    Aggregator<TimestampedValue<T>, T> specialValueAgg =
+        createAggregator("oneValue", new Latest.LatestFn<T>());
+
+    Aggregator<TimestampedValue<T>, T> noValuesAgg =
+        createAggregator("noValues", new Latest.LatestFn<T>());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
+      allValuesAgg.addValue(val);
+      if (Objects.equals(c.element(), specialValue)) {
+        specialValueAgg.addValue(val);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
new file mode 100644
index 0000000..ce9ae37
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest} {@link PTransform} and {@link Combine.CombineFn}.
+ */
+@RunWith(JUnit4.class)
+public class LatestTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGloballyEventTimestamp() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<String> output =
+        p.apply(Create.timestamped(
+            TimestampedValue.of("foo", new Instant(100)),
+            TimestampedValue.of("bar", new Instant(300)),
+            TimestampedValue.of("baz", new Instant(200))
+        ))
+        .apply(Latest.<String>globally());
+
+    PAssert.that(output).containsInAnyOrder("bar");
+    p.run();
+  }
+
+  @Test
+  public void testGloballyOutputCoder() {
+    TestPipeline p = TestPipeline.create();
+    BigEndianLongCoder inputCoder = BigEndianLongCoder.of();
+
+    PCollection<Long> output =
+        p.apply(Create.of(1L, 2L).withCoder(inputCoder))
+            .apply(Latest.<Long>globally());
+
+    Coder<Long> outputCoder = output.getCoder();
+    assertThat(outputCoder, instanceOf(NullableCoder.class));
+    assertEquals(inputCoder, ((NullableCoder<?>) outputCoder).getValueCoder());
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGloballyEmptyCollection() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> emptyInput = p.apply(Create.<Long>of()
+        // Explicitly set coder such that then runner enforces encodability.
+        .withCoder(VarLongCoder.of()));
+    PCollection<Long> output = emptyInput.apply(Latest.<Long>globally());
+
+    PAssert.that(output).containsInAnyOrder((Long) null);
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testPerKeyEventTimestamp() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<KV<String, String>> output =
+        p.apply(Create.timestamped(
+            TimestampedValue.of(KV.of("A", "foo"), new Instant(100)),
+            TimestampedValue.of(KV.of("B", "bar"), new Instant(300)),
+            TimestampedValue.of(KV.of("A", "baz"), new Instant(200))
+        ))
+            .apply(Latest.<String, String>perKey());
+
+    PAssert.that(output).containsInAnyOrder(KV.of("B", "bar"), KV.of("A", "baz"));
+    p.run();
+  }
+
+  @Test
+  public void testPerKeyOutputCoder() {
+    TestPipeline p = TestPipeline.create();
+    KvCoder<String, Long> inputCoder = KvCoder.of(
+        AvroCoder.of(String.class), AvroCoder.of(Long.class));
+
+    PCollection<KV<String, Long>> output =
+        p.apply(Create.of(KV.of("foo", 1L)).withCoder(inputCoder))
+            .apply(Latest.<String, Long>perKey());
+
+    assertEquals("Should use input coder for outputs", inputCoder, output.getCoder());
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testPerKeyEmptyCollection() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<KV<String, String>> output =
+        p.apply(Create.<KV<String, String>>of().withCoder(KvCoder.of(
+            StringUtf8Coder.of(), StringUtf8Coder.of())))
+         .apply(Latest.<String, String>perKey());
+
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  /** Helper method to easily create a timestamped value. */
+  private static TimestampedValue<Long> timestamped(Instant timestamp) {
+    return TimestampedValue.of(uniqueLong.incrementAndGet(), timestamp);
+  }
+  private static final AtomicLong uniqueLong = new AtomicLong();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
new file mode 100644
index 0000000..a982f31
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.testing.EqualsTester;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link TimestampedValue}.
+ */
+@RunWith(JUnit4.class)
+public class TimestampedValueTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testValues() {
+    Instant now = Instant.now();
+    TimestampedValue<String> tsv = TimestampedValue.of("foobar", now);
+
+    assertEquals(now, tsv.getTimestamp());
+    assertEquals("foobar", tsv.getValue());
+  }
+
+  @Test
+  public void testAtMinimumTimestamp() {
+    TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp("foobar");
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tsv.getTimestamp());
+  }
+
+  @Test
+  public void testNullTimestamp() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("timestamp");
+    TimestampedValue.of("foobar", null);
+  }
+
+  @Test
+  public void testNullValue() {
+    TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp(null);
+    assertEquals(null, tsv.getValue());
+  }
+
+  @Test
+  public void testEquality() {
+    new EqualsTester()
+        .addEqualityGroup(
+            TimestampedValue.of("foo", new Instant(1000)),
+            TimestampedValue.of("foo", new Instant(1000)))
+        .addEqualityGroup(TimestampedValue.of("foo", new Instant(2000)))
+        .addEqualityGroup(TimestampedValue.of("bar", new Instant(1000)))
+        .addEqualityGroup(
+            TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
+            TimestampedValue.atMinimumTimestamp("foo"))
+        .testEquals();
+  }
+}