You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/28 02:28:15 UTC

[6/7] incubator-beam git commit: [BEAM-151] Move over some more Dataflow specific classes.

[BEAM-151] Move over some more Dataflow specific classes.

Note that users should use proto ByteString instead of RandomAccessData
since it provides a safer version of the same functionality.

I hoped that I would be able to move over more of the *Cloud* classes
and their helpers but they are embedded part of coders. Nothing more
can be done here until there is an official Beam representation of a coder
decoupled from Dataflow CloudKnownTypes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27979d76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27979d76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27979d76

Branch: refs/heads/master
Commit: 27979d76666ea8a5ea2d5efe1ac3159cebaeec97
Parents: 81bf4d9
Author: Luke Cwik <lc...@google.com>
Authored: Tue Apr 26 11:43:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 27 17:26:30 2016 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   6 +
 .../beam/sdk/runners/worker/IsmFormat.java      | 811 +++++++++++++++++++
 .../beam/sdk/runners/worker/package-info.java   |  25 +
 .../java/org/apache/beam/sdk/util/DoFnInfo.java |  68 ++
 .../apache/beam/sdk/util/OutputReference.java   |  43 +
 .../apache/beam/sdk/util/RandomAccessData.java  | 355 ++++++++
 .../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ++++
 .../beam/sdk/util/RandomAccessDataTest.java     | 207 +++++
 .../org/apache/beam/sdk/util/TimeUtilTest.java  |  75 ++
 .../beam/sdk/runners/worker/IsmFormat.java      | 811 -------------------
 .../beam/sdk/runners/worker/package-info.java   |  25 -
 .../java/org/apache/beam/sdk/util/DoFnInfo.java |  68 --
 .../apache/beam/sdk/util/OutputReference.java   |  43 -
 .../apache/beam/sdk/util/RandomAccessData.java  | 355 --------
 .../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ----
 .../sdk/util/common/worker/package-info.java    |  19 -
 .../beam/sdk/util/RandomAccessDataTest.java     | 207 -----
 .../org/apache/beam/sdk/util/TimeUtilTest.java  |  75 --
 18 files changed, 1756 insertions(+), 1769 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index beb340c..23b7f5f 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -524,6 +524,12 @@
       <version>1.0-rc2</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <version>1.1</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- test dependencies -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
new file mode 100644
index 0000000..8df46dd
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.runners.worker;
+
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.RandomAccessData;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
+ * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
+ * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
+ * around the data format. In addition to the data, there is a bloom filter,
+ * and multiple indices to allow for efficient retrieval.
+ *
+ * <p>An Ism file is composed of these high level sections (in order):
+ * <ul>
+ *   <li>shard block</li>
+ *   <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
+ *   <li>shard index</li>
+ *   <li>footer (See {@link Footer} for details on encoding format)</li>
+ * </ul>
+ *
+ * <p>The shard block is composed of multiple copies of the following:
+ * <ul>
+ *   <li>data block</li>
+ *   <li>data index</li>
+ * </ul>
+ *
+ * <p>The data block is composed of multiple copies of the following:
+ * <ul>
+ *   <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ *   <li>unshared key bytes</li>
+ *   <li>value bytes</li>
+ *   <li>optional 0x00 0x00 bytes followed by metadata bytes
+ *       (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
+ * </ul>
+ * Each key written into the data block must be in unsigned lexicographically increasing order
+ * and also its shard portion of the key must hash to the same shard id as all other keys
+ * within the same data block. The hashing function used is the
+ * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+ * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+ * using {@code 1225801234} as the seed value.
+ *
+ * <p>The data index is composed of {@code N} copies of the following:
+ * <ul>
+ *   <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ *   <li>unshared key bytes</li>
+ *   <li>byte offset to key prefix in data block (variable length long coding)</li>
+ * </ul>
+ *
+ * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
+ * the number of shard index records followed by that many shard index records.
+ * See {@link IsmShardCoder} for further details as to its encoding scheme.
+ */
+public class IsmFormat {
+  private static final int HASH_SEED = 1225801234;
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
+  public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
+
+  /**
+   * A record containing a composite key and either a value or metadata. The composite key
+   * must not contain the metadata key component place holder if producing a value record, and must
+   * contain the metadata component key place holder if producing a metadata record.
+   *
+   * <p>The composite key is a fixed number of component keys where the first {@code N} component
+   * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
+   * further details.
+   */
+  @AutoValue
+  public abstract static class IsmRecord<V> {
+    abstract List<?> keyComponents();
+    @Nullable abstract V value();
+    @Nullable abstract byte[] metadata();
+
+    IsmRecord() {} // Prevent public constructor
+
+    /** Returns an IsmRecord with the specified key components and value. */
+    public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
+      checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+      checkArgument(!isMetadataKey(keyComponents),
+          "Expected key components to not contain metadata key.");
+      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+    }
+
+    public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
+      checkNotNull(metadata);
+      checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+      checkArgument(isMetadataKey(keyComponents),
+          "Expected key components to contain metadata key.");
+      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
+    }
+
+    /** Returns the list of key components. */
+    public List<?> getKeyComponents() {
+      return keyComponents();
+    }
+
+    /** Returns the key component at the specified index. */
+    public Object getKeyComponent(int index) {
+      return keyComponents().get(index);
+    }
+
+    /**
+     * Returns the value. Throws {@link IllegalStateException} if this is not a
+     * value record.
+     */
+    public V getValue() {
+      checkState(!isMetadataKey(keyComponents()),
+          "This is a metadata record and not a value record.");
+      return value();
+    }
+
+    /**
+     * Returns the metadata. Throws {@link IllegalStateException} if this is not a
+     * metadata record.
+     */
+    public byte[] getMetadata() {
+      checkState(isMetadataKey(keyComponents()),
+          "This is a value record and not a metadata record.");
+      return metadata();
+    }
+  }
+
+  /** A {@link Coder} for {@link IsmRecord}s.
+   *
+   * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
+   * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
+   * is combined with an {@link IsmSink} will one produce an Ism file.
+   *
+   * <p>The {@link IsmRecord} encoded format is:
+   * <ul>
+   *   <li>encoded key component 1 using key component coder 1</li>
+   *   <li>...</li>
+   *   <li>encoded key component N using key component coder N</li>
+   *   <li>encoded value using value coder</li>
+   * </ul>
+   */
+  public static class IsmRecordCoder<V>
+      extends StandardCoder<IsmRecord<V>> {
+    /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
+    public static <V> IsmRecordCoder<V> of(
+        int numberOfShardKeyCoders,
+        int numberOfMetadataShardKeyCoders,
+        List<Coder<?>> keyComponentCoders,
+        Coder<V> valueCoder) {
+      checkNotNull(keyComponentCoders);
+      checkArgument(keyComponentCoders.size() > 0);
+      checkArgument(numberOfShardKeyCoders > 0);
+      checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
+      checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
+      return new IsmRecordCoder<>(
+          numberOfShardKeyCoders,
+          numberOfMetadataShardKeyCoders,
+          keyComponentCoders,
+          valueCoder);
+    }
+
+    /**
+     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+     * to be called by users but used by Jackson when decoding this coder.
+     */
+    @JsonCreator
+    public static IsmRecordCoder<?> of(
+        @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
+        @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+      Preconditions.checkArgument(components.size() >= 2,
+          "Expecting at least 2 components, got " + components.size());
+      return of(
+          numberOfShardCoders,
+          numberOfMetadataShardCoders,
+          components.subList(0, components.size() - 1),
+          components.get(components.size() - 1));
+    }
+
+    private final int numberOfShardKeyCoders;
+    private final int numberOfMetadataShardKeyCoders;
+    private final List<Coder<?>> keyComponentCoders;
+    private final Coder<V> valueCoder;
+
+    private IsmRecordCoder(
+        int numberOfShardKeyCoders,
+        int numberOfMetadataShardKeyCoders,
+        List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
+      this.numberOfShardKeyCoders = numberOfShardKeyCoders;
+      this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
+      this.keyComponentCoders = keyComponentCoders;
+      this.valueCoder = valueCoder;
+    }
+
+    /** Returns the list of key component coders. */
+    public List<Coder<?>> getKeyComponentCoders() {
+      return keyComponentCoders;
+    }
+
+    /** Returns the key coder at the specified index. */
+    public Coder getKeyComponentCoder(int index) {
+      return keyComponentCoders.get(index);
+    }
+
+    /** Returns the value coder. */
+    public Coder<V> getValueCoder() {
+      return valueCoder;
+    }
+
+    @Override
+    public void encode(IsmRecord<V> value, OutputStream outStream,
+        Coder.Context context) throws CoderException, IOException {
+      if (value.getKeyComponents().size() != keyComponentCoders.size()) {
+        throw new CoderException(String.format(
+            "Expected %s key component(s) but received key component(s) %s.",
+            keyComponentCoders.size(), value.getKeyComponents()));
+      }
+      for (int i = 0; i < keyComponentCoders.size(); ++i) {
+        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
+      }
+      if (isMetadataKey(value.getKeyComponents())) {
+        ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
+      } else {
+        valueCoder.encode(value.getValue(), outStream, context.nested());
+      }
+    }
+
+    @Override
+    public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
+      for (Coder<?> keyCoder : keyComponentCoders) {
+        keyComponents.add(keyCoder.decode(inStream, context.nested()));
+      }
+      if (isMetadataKey(keyComponents)) {
+        return IsmRecord.<V>meta(
+            keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
+      } else {
+        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
+      }
+    }
+
+    public int getNumberOfShardKeyCoders(List<?> keyComponents) {
+      if (isMetadataKey(keyComponents)) {
+        return numberOfMetadataShardKeyCoders;
+      } else {
+        return numberOfShardKeyCoders;
+      }
+    }
+
+    /**
+     * Computes the shard id for the given key component(s).
+     *
+     * The shard keys are encoded into their byte representations and hashed using the
+     * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+     * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+     * using {@code 1225801234} as the seed value. We ensure that shard ids for
+     * metadata keys and normal keys do not overlap.
+     */
+    public <V, T> int hash(List<?> keyComponents) {
+      return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
+    }
+
+    /**
+     * Computes the shard id for the given key component(s).
+     *
+     * Mutates {@code keyBytes} such that when returned, contains the encoded
+     * version of the key components.
+     */
+    public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
+      return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
+    }
+
+    /**
+     * Computes the shard id for the given key component(s).
+     *
+     * Mutates {@code keyBytes} such that when returned, contains the encoded
+     * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
+     * store the location where each key component's encoded byte representation ends within
+     * {@code keyBytes}.
+     */
+    public <V, T> int encodeAndHash(
+        List<?> keyComponents,
+        RandomAccessData keyBytesToMutate,
+        List<Integer> keyComponentByteOffsetsToMutate) {
+      checkNotNull(keyComponents);
+      checkArgument(keyComponents.size() <= keyComponentCoders.size(),
+          "Expected at most %s key component(s) but received %s.",
+          keyComponentCoders.size(), keyComponents);
+
+      final int numberOfKeyCodersToUse;
+      final int shardOffset;
+      if (isMetadataKey(keyComponents)) {
+        numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
+        shardOffset = SHARD_BITS + 1;
+      } else {
+        numberOfKeyCodersToUse = numberOfShardKeyCoders;
+        shardOffset = 0;
+      }
+
+      checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
+          "Expected at least %s key component(s) but received %s.",
+          numberOfShardKeyCoders, keyComponents);
+
+      try {
+        // Encode the shard portion
+        for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
+          getKeyComponentCoder(i).encode(
+              keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+          keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+        }
+        int rval = HASH_FUNCTION.hashBytes(
+            keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
+        rval += shardOffset;
+
+        // Encode the remainder
+        for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
+          getKeyComponentCoder(i).encode(
+              keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+          keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+        }
+        return rval;
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            String.format("Failed to hash %s with coder %s", keyComponents, this), e);
+      }
+    }
+
+    @Override
+    public List<Coder<?>> getCoderArguments() {
+      return ImmutableList.<Coder<?>>builder()
+          .addAll(keyComponentCoders)
+          .add(valueCoder)
+          .build();
+    }
+
+    @Override
+    public CloudObject asCloudObject() {
+      CloudObject cloudObject = super.asCloudObject();
+      addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+      addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
+      return cloudObject;
+    }
+
+    @Override
+    public void verifyDeterministic() throws Coder.NonDeterministicException {
+      verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
+      verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      for (Coder<?> keyComponentCoder : keyComponentCoders) {
+        if (!keyComponentCoder.consistentWithEquals()) {
+          return false;
+        }
+      }
+      return valueCoder.consistentWithEquals();
+    }
+
+    @Override
+    public Object structuralValue(IsmRecord<V> record) throws Exception {
+      checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
+          "Expected the number of key component coders %s "
+          + "to match the number of key components %s.",
+          keyComponentCoders.size(), record.getKeyComponents());
+
+      if (record != null && consistentWithEquals()) {
+        ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
+        for (int i = 0; i < keyComponentCoders.size(); ++i) {
+          keyComponentStructuralValues.add(
+              getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
+        }
+        if (isMetadataKey(record.getKeyComponents())) {
+          return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
+        } else {
+          return IsmRecord.of(keyComponentStructuralValues,
+              valueCoder.structuralValue(record.getValue()));
+        }
+      }
+      return super.structuralValue(record);
+    }
+  }
+
+  /**
+   * Validates that the key portion of the given coder is deterministic.
+   */
+  public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
+    for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
+      try {
+          keyComponentCoder.verifyDeterministic();
+      } catch (NonDeterministicException e) {
+        throw new IllegalArgumentException(
+            String.format("Key component coder %s is expected to be deterministic.",
+                keyComponentCoder), e);
+      }
+    }
+  }
+
+  /** Returns true if and only if any of the passed in key components represent a metadata key. */
+  public static boolean isMetadataKey(List<?> keyComponents) {
+    for (Object keyComponent : keyComponents) {
+      if (keyComponent == METADATA_KEY) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** A marker object representing the wildcard metadata key component. */
+  private static final Object METADATA_KEY = new Object() {
+    @Override
+    public String toString() {
+      return "META";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj;
+    }
+
+    @Override
+    public int hashCode() {
+      return -1248902349;
+    }
+  };
+
+  /**
+   * An object representing a wild card for a key component.
+   * Encoded using {@link MetadataKeyCoder}.
+   */
+  public static Object getMetadataKey() {
+    return METADATA_KEY;
+  }
+
+  /**
+   * A coder for metadata key component. Can be used to wrap key component coder allowing for
+   * the metadata key component to be used as a place holder instead of an actual key.
+   */
+  public static class MetadataKeyCoder<K> extends StandardCoder<K> {
+    public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
+      checkNotNull(keyCoder);
+      return new MetadataKeyCoder<>(keyCoder);
+    }
+
+    /**
+     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+     * to be called by users but used by Jackson when decoding this coder.
+     */
+    @JsonCreator
+    public static MetadataKeyCoder<?> of(
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+      Preconditions.checkArgument(components.size() == 1,
+          "Expecting one component, got " + components.size());
+      return of(components.get(0));
+    }
+
+    private final Coder<K> keyCoder;
+
+    private MetadataKeyCoder(Coder<K> keyCoder) {
+      this.keyCoder = keyCoder;
+    }
+
+    public Coder<K> getKeyCoder() {
+      return keyCoder;
+    }
+
+    @Override
+    public void encode(K value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      if (value == METADATA_KEY) {
+        outStream.write(0);
+      } else {
+        outStream.write(1);
+        keyCoder.encode(value, outStream, context.nested());
+      }
+    }
+
+    @Override
+    public K decode(InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      int marker = inStream.read();
+      if (marker == 0) {
+        return (K) getMetadataKey();
+      } else if (marker == 1) {
+        return keyCoder.decode(inStream, context.nested());
+      } else {
+        throw new CoderException(String.format("Expected marker but got %s.", marker));
+      }
+    }
+
+    @Override
+    public List<Coder<?>> getCoderArguments() {
+      return ImmutableList.<Coder<?>>of(keyCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+    }
+  }
+
+  /**
+   * A shard descriptor containing shard id, the data block offset, and the index offset for the
+   * given shard.
+   */
+  @AutoValue
+  public abstract static class IsmShard {
+    abstract int id();
+    abstract long blockOffset();
+    abstract long indexOffset();
+
+    IsmShard() {}
+
+    /** Returns an IsmShard with the given id, block offset and no index offset. */
+    public static IsmShard of(int id, long blockOffset) {
+      IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
+      checkState(id >= 0,
+          "%s attempting to be written with negative shard id.",
+          ismShard);
+      checkState(blockOffset >= 0,
+          "%s attempting to be written with negative block offset.",
+          ismShard);
+      return ismShard;
+    }
+
+    /** Returns an IsmShard with the given id, block offset, and index offset. */
+    public static IsmShard of(int id, long blockOffset, long indexOffset) {
+      IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
+      checkState(id >= 0,
+          "%s attempting to be written with negative shard id.",
+          ismShard);
+      checkState(blockOffset >= 0,
+          "%s attempting to be written with negative block offset.",
+          ismShard);
+      checkState(indexOffset >= 0,
+          "%s attempting to be written with negative index offset.",
+          ismShard);
+      return ismShard;
+    }
+
+    /** Return the shard id. */
+    public int getId() {
+      return id();
+    }
+
+    /** Return the absolute position within the Ism file where the data block begins. */
+    public long getBlockOffset() {
+      return blockOffset();
+    }
+
+    /**
+     * Return the absolute position within the Ism file where the index block begins.
+     * Throws {@link IllegalStateException} if the index offset was never specified.
+     */
+    public long getIndexOffset() {
+      checkState(indexOffset() >= 0,
+            "Unable to fetch index offset because it was never specified.");
+      return indexOffset();
+    }
+
+    /** Returns a new IsmShard like this one with the specified index offset. */
+    public IsmShard withIndexOffset(long indexOffset) {
+      return of(id(), blockOffset(), indexOffset);
+    }
+  }
+
+  /**
+   * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
+   * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
+   * encoding specification.
+   */
+  public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
+      ListCoder.of(IsmShardCoder.of());
+
+  /**
+   * A coder for {@link IsmShard}s.
+   *
+   * The shard descriptor is encoded as:
+   * <ul>
+   *   <li>id (variable length integer encoding)</li>
+   *   <li>blockOffset (variable length long encoding)</li>
+   *   <li>indexOffset (variable length long encoding)</li>
+   * </ul>
+   */
+  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
+    private static final IsmShardCoder INSTANCE = new IsmShardCoder();
+
+    /** Returns an IsmShardCoder. */
+    @JsonCreator
+    public static IsmShardCoder of() {
+      return INSTANCE;
+    }
+
+    private IsmShardCoder() {
+    }
+
+    @Override
+    public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      checkState(value.getIndexOffset() >= 0,
+          "%s attempting to be written without index offset.",
+          value);
+      VarIntCoder.of().encode(value.getId(), outStream, context.nested());
+      VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
+      VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
+    }
+
+    @Override
+    public IsmShard decode(
+        InputStream inStream, Coder.Context context) throws CoderException, IOException {
+      return IsmShard.of(
+          VarIntCoder.of().decode(inStream, context),
+          VarLongCoder.of().decode(inStream, context),
+          VarLongCoder.of().decode(inStream, context));
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+  }
+
+  /**
+   * The prefix used before each key which contains the number of shared and unshared
+   * bytes from the previous key that was read. The key prefix along with the previous key
+   * and the unshared key bytes allows one to construct the current key by doing the following
+   * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
+   *
+   * <p>The key prefix is encoded as:
+   * <ul>
+   *   <li>number of shared key bytes (variable length integer coding)</li>
+   *   <li>number of unshared key bytes (variable length integer coding)</li>
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class KeyPrefix {
+    public abstract int getSharedKeySize();
+    public abstract int getUnsharedKeySize();
+
+    public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
+      return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
+    }
+  }
+
+  /** A {@link Coder} for {@link KeyPrefix}. */
+  public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
+    private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
+
+    @JsonCreator
+    public static KeyPrefixCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      VarInt.encode(value.getSharedKeySize(), outStream);
+      VarInt.encode(value.getUnsharedKeySize(), outStream);
+    }
+
+    @Override
+    public KeyPrefix decode(InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
+      return true;
+    }
+
+    @Override
+    public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
+        throws Exception {
+      Preconditions.checkNotNull(value);
+      return VarInt.getLength(value.getSharedKeySize())
+          + VarInt.getLength(value.getUnsharedKeySize());
+    }
+  }
+
+  /**
+   * The footer stores the relevant information required to locate the index and bloom filter.
+   * It also stores a version byte and the number of keys stored.
+   *
+   * <p>The footer is encoded as the value containing:
+   * <ul>
+   *   <li>start of bloom filter offset (big endian long coding)</li>
+   *   <li>start of shard index position offset (big endian long coding)</li>
+   *   <li>number of keys in file (big endian long coding)</li>
+   *   <li>0x01 (version key as a single byte)</li>
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class Footer {
+    public static final int LONG_BYTES = 8;
+    public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
+    public static final byte VERSION = 2;
+
+    public abstract byte getVersion();
+    public abstract long getIndexPosition();
+    public abstract long getBloomFilterPosition();
+    public abstract long getNumberOfKeys();
+
+    public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
+      return new AutoValue_IsmFormat_Footer(
+          VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
+    }
+  }
+
+  /** A {@link Coder} for {@link Footer}. */
+  public static final class FooterCoder extends AtomicCoder<Footer> {
+    private static final FooterCoder INSTANCE = new FooterCoder();
+
+    @JsonCreator
+    public static FooterCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(Footer value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      DataOutputStream dataOut = new DataOutputStream(outStream);
+      dataOut.writeLong(value.getIndexPosition());
+      dataOut.writeLong(value.getBloomFilterPosition());
+      dataOut.writeLong(value.getNumberOfKeys());
+      dataOut.write(Footer.VERSION);
+    }
+
+    @Override
+    public Footer decode(InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      DataInputStream dataIn = new DataInputStream(inStream);
+      Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
+      int version = dataIn.read();
+      if (version != Footer.VERSION) {
+        throw new IOException("Unknown version " + version + ". "
+            + "Only version 2 is currently supported.");
+      }
+      return footer;
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
+      return true;
+    }
+
+    @Override
+    public long getEncodedElementByteSize(Footer value, Coder.Context context)
+        throws Exception {
+      return Footer.FIXED_LENGTH;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
new file mode 100644
index 0000000..6133148
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
+ * execution of Pipeline code.
+ */
+@ParametersAreNonnullByDefault
+package org.apache.beam.sdk.runners.worker;
+
+import javax.annotation.ParametersAreNonnullByDefault;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
new file mode 100644
index 0000000..ae19a17
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper class holding the necessary information to serialize a DoFn.
+ *
+ * @param <InputT> the type of the (main) input elements of the DoFn
+ * @param <OutputT> the type of the (main) output elements of the DoFn
+ */
+public class DoFnInfo<InputT, OutputT> implements Serializable {
+  private final DoFn<InputT, OutputT> doFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final Iterable<PCollectionView<?>> sideInputViews;
+  private final Coder<InputT> inputCoder;
+
+  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
+    this.doFn = doFn;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputViews = null;
+    this.inputCoder = null;
+  }
+
+  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
+                  Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
+    this.doFn = doFn;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputViews = sideInputViews;
+    this.inputCoder = inputCoder;
+  }
+
+  public DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
+  }
+
+  public WindowingStrategy<?, ?> getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  public Iterable<PCollectionView<?>> getSideInputViews() {
+    return sideInputViews;
+  }
+
+  public Coder<InputT> getInputCoder() {
+    return inputCoder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
new file mode 100644
index 0000000..5e30172
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.api.client.util.Preconditions.checkNotNull;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Key;
+
+/**
+ * A representation used by {@link com.google.api.services.dataflow.model.Step}s
+ * to reference the output of other {@code Step}s.
+ */
+public final class OutputReference extends GenericJson {
+  @Key("@type")
+  public final String type = "OutputReference";
+
+  @Key("step_name")
+  private final String stepName;
+
+  @Key("output_name")
+  private final String outputName;
+
+  public OutputReference(String stepName, String outputName) {
+    this.stepName = checkNotNull(stepName);
+    this.outputName = checkNotNull(outputName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
new file mode 100644
index 0000000..2d902f4
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.UnsignedBytes;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An elastic-sized byte array which allows you to manipulate it as a stream, or access
+ * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
+ * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
+ * also provides random access to bytes stored within. This wrapper allows users to finely
+ * control the number of byte copies that occur.
+ *
+ * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
+ * unused storage.
+ */
+@NotThreadSafe
+public class RandomAccessData {
+  /**
+   * A {@link Coder} which encodes the valid parts of this stream.
+   * This follows the same encoding scheme as {@link ByteArrayCoder}.
+   * This coder is deterministic and consistent with equals.
+   *
+   * This coder does not support encoding positive infinity.
+   */
+  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
+    private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
+
+    @JsonCreator
+    public static RandomAccessDataCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      if (value == POSITIVE_INFINITY) {
+        throw new CoderException("Positive infinity can not be encoded.");
+      }
+      if (!context.isWholeStream) {
+        VarInt.encode(value.size, outStream);
+      }
+      value.writeTo(outStream, 0, value.size);
+    }
+
+    @Override
+    public RandomAccessData decode(InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      RandomAccessData rval = new RandomAccessData();
+      if (!context.isWholeStream) {
+        int length = VarInt.decodeInt(inStream);
+        rval.readFrom(inStream, 0, length);
+      } else {
+        ByteStreams.copy(inStream, rval.asOutputStream());
+      }
+      return rval;
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(
+        RandomAccessData value, Coder.Context context) {
+      return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
+        throws Exception {
+      if (value == null) {
+        throw new CoderException("cannot encode a null in memory stream");
+      }
+      long size = 0;
+      if (!context.isWholeStream) {
+        size += VarInt.getLength(value.size);
+      }
+      return size + value.size;
+    }
+  }
+
+  public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
+      new UnsignedLexicographicalComparator();
+
+  /**
+   * A {@link Comparator} that compares two byte arrays lexicographically. It compares
+   * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
+   * or when one array is a prefix of the other, treats the shorter array as the lesser.
+   * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
+   *
+   * <p>Note that a token type of positive infinity is supported and is greater than
+   * all other {@link RandomAccessData}.
+   */
+  public static final class UnsignedLexicographicalComparator
+      implements Comparator<RandomAccessData> {
+    // Do not instantiate
+    private UnsignedLexicographicalComparator() {
+    }
+
+    @Override
+    public int compare(RandomAccessData o1, RandomAccessData o2) {
+      return compare(o1, o2, 0 /* start from the beginning */);
+    }
+
+    /**
+     * Compare the two sets of bytes starting at the given offset.
+     */
+    public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
+      if (o1 == o2) {
+        return 0;
+      }
+      if (o1 == POSITIVE_INFINITY) {
+        return 1;
+      }
+      if (o2 == POSITIVE_INFINITY) {
+        return -1;
+      }
+
+      int minBytesLen = Math.min(o1.size, o2.size);
+      for (int i = startOffset; i < minBytesLen; i++) {
+        // unsigned comparison
+        int b1 = o1.buffer[i] & 0xFF;
+        int b2 = o2.buffer[i] & 0xFF;
+        if (b1 == b2) {
+          continue;
+        }
+        // Return the stream with the smaller byte as the smaller value.
+        return b1 - b2;
+      }
+      // If one is a prefix of the other, return the shorter one as the smaller one.
+      // If both lengths are equal, then both streams are equal.
+      return o1.size - o2.size;
+    }
+
+    /**
+     * Compute the length of the common prefix of the two provided sets of bytes.
+     */
+    public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
+      int minBytesLen = Math.min(o1.size, o2.size);
+      for (int i = 0; i < minBytesLen; i++) {
+        // unsigned comparison
+        int b1 = o1.buffer[i] & 0xFF;
+        int b2 = o2.buffer[i] & 0xFF;
+        if (b1 != b2) {
+          return i;
+        }
+      }
+      return minBytesLen;
+    }
+  }
+
+  /** A token type representing positive infinity. */
+  static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
+
+  /**
+   * Returns a RandomAccessData that is the smallest value of same length which
+   * is strictly greater than this. Note that if this is empty or is all 0xFF then
+   * a token value of positive infinity is returned.
+   *
+   * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
+   * with support for positive infinitiy.
+   */
+  public RandomAccessData increment() throws IOException {
+    RandomAccessData copy = copy();
+    for (int i = copy.size - 1; i >= 0; --i) {
+      if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
+        copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
+        return copy;
+      }
+    }
+    return POSITIVE_INFINITY;
+  }
+
+  private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
+
+  /** Constructs a RandomAccessData with a default buffer size. */
+  public RandomAccessData() {
+    this(DEFAULT_INITIAL_BUFFER_SIZE);
+  }
+
+  /** Constructs a RandomAccessData with the initial buffer. */
+  public RandomAccessData(byte[] initialBuffer) {
+    checkNotNull(initialBuffer);
+    this.buffer = initialBuffer;
+    this.size = initialBuffer.length;
+  }
+
+  /** Constructs a RandomAccessData with the given buffer size. */
+  public RandomAccessData(int initialBufferSize) {
+    checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
+    this.buffer = new byte[initialBufferSize];
+  }
+
+  private byte[] buffer;
+  private int size;
+
+  /** Returns the backing array. */
+  public byte[] array() {
+    return buffer;
+  }
+
+  /** Returns the number of bytes in the backing array that are valid. */
+  public int size() {
+    return size;
+  }
+
+  /** Resets the end of the stream to the specified position. */
+  public void resetTo(int position) {
+    ensureCapacity(position);
+    size = position;
+  }
+
+  private final OutputStream outputStream = new OutputStream() {
+    @Override
+    public void write(int b) throws IOException {
+      ensureCapacity(size + 1);
+      buffer[size] = (byte) b;
+      size += 1;
+    }
+
+    @Override
+    public void write(byte[] b, int offset, int length) throws IOException {
+      ensureCapacity(size + length);
+      System.arraycopy(b, offset, buffer, size, length);
+      size += length;
+    }
+  };
+
+  /**
+   * Returns an output stream which writes to the backing buffer from the current position.
+   * Note that the internal buffer will grow as required to accomodate all data written.
+   */
+  public OutputStream asOutputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
+   * starting at {@code offset} and up to {@code length} bytes. Note that the returned
+   * {@link InputStream} is only a wrapper and any modifications to the underlying
+   * {@link RandomAccessData} will be visible by the {@link InputStream}.
+   */
+  public InputStream asInputStream(final int offset, final int length) {
+    return new ByteArrayInputStream(buffer, offset, length);
+  }
+
+  /**
+   * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
+   * specified output stream.
+   */
+  public void writeTo(OutputStream out, int offset, int length) throws IOException {
+    out.write(buffer, offset, length);
+  }
+
+  /**
+   * Reads {@code length} bytes from the specified input stream writing them into the backing
+   * data store starting at {@code offset}.
+   *
+   * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
+   */
+  public void readFrom(InputStream inStream, int offset, int length) throws IOException {
+    ensureCapacity(offset + length);
+    ByteStreams.readFully(inStream, buffer, offset, length);
+    size = offset + length;
+  }
+
+  /** Returns a copy of this RandomAccessData. */
+  public RandomAccessData copy() throws IOException {
+    RandomAccessData copy = new RandomAccessData(size);
+    writeTo(copy.asOutputStream(), 0, size);
+    return copy;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof RandomAccessData)) {
+      return false;
+    }
+    return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 1;
+    for (int i = 0; i < size; ++i) {
+        result = 31 * result + buffer[i];
+    }
+
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("buffer", Arrays.copyOf(buffer, size))
+        .add("size", size)
+        .toString();
+  }
+
+  private void ensureCapacity(int minCapacity) {
+    // If we have enough space, don't grow the buffer.
+    if (minCapacity <= buffer.length) {
+        return;
+    }
+
+    // Try to double the size of the buffer, if thats not enough, just use the new capacity.
+    // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
+    int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
+    if (newCapacity < minCapacity) {
+        newCapacity = minCapacity;
+    }
+    buffer = Arrays.copyOf(buffer, newCapacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
new file mode 100644
index 0000000..db5c760
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDuration;
+import org.joda.time.ReadableInstant;
+import org.joda.time.chrono.ISOChronology;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for converting between Dataflow API and SDK time
+ * representations.
+ *
+ * <p>Dataflow API times are strings of the form
+ * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
+ * strings with optional fractional seconds and a 'Z' offset.
+ *
+ * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
+ * that is, seconds with optional fractional seconds and a literal 's' at the end.
+ *
+ * <p>In both formats, fractional seconds are either three digits (millisecond
+ * resolution), six digits (microsecond resolution), or nine digits (nanosecond
+ * resolution).
+ */
+public final class TimeUtil {
+  private TimeUtil() {}  // Non-instantiable.
+
+  private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
+  private static final Pattern TIME_PATTERN =
+      Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
+
+  /**
+   * Converts a {@link ReadableInstant} into a Dateflow API time value.
+   */
+  public static String toCloudTime(ReadableInstant instant) {
+    // Note that since Joda objects use millisecond resolution, we always
+    // produce either no fractional seconds or fractional seconds with
+    // millisecond resolution.
+
+    // Translate the ReadableInstant to a DateTime with ISOChronology.
+    DateTime time = new DateTime(instant);
+
+    int millis = time.getMillisOfSecond();
+    if (millis == 0) {
+      return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
+          time.getYear(),
+          time.getMonthOfYear(),
+          time.getDayOfMonth(),
+          time.getHourOfDay(),
+          time.getMinuteOfHour(),
+          time.getSecondOfMinute());
+    } else {
+      return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
+          time.getYear(),
+          time.getMonthOfYear(),
+          time.getDayOfMonth(),
+          time.getHourOfDay(),
+          time.getMinuteOfHour(),
+          time.getSecondOfMinute(),
+          millis);
+    }
+  }
+
+  /**
+   * Converts a time value received via the Dataflow API into the corresponding
+   * {@link Instant}.
+   * @return the parsed time, or null if a parse error occurs
+   */
+  @Nullable
+  public static Instant fromCloudTime(String time) {
+    Matcher matcher = TIME_PATTERN.matcher(time);
+    if (!matcher.matches()) {
+      return null;
+    }
+    int year = Integer.valueOf(matcher.group(1));
+    int month = Integer.valueOf(matcher.group(2));
+    int day = Integer.valueOf(matcher.group(3));
+    int hour = Integer.valueOf(matcher.group(4));
+    int minute = Integer.valueOf(matcher.group(5));
+    int second = Integer.valueOf(matcher.group(6));
+    int millis = 0;
+
+    String frac = matcher.group(7);
+    if (frac != null) {
+      int fracs = Integer.valueOf(frac);
+      if (frac.length() == 3) {  // millisecond resolution
+        millis = fracs;
+      } else if (frac.length() == 6) {  // microsecond resolution
+        millis = fracs / 1000;
+      } else if (frac.length() == 9) {  // nanosecond resolution
+        millis = fracs / 1000000;
+      } else {
+        return null;
+      }
+    }
+
+    return new DateTime(year, month, day, hour, minute, second, millis,
+        ISOChronology.getInstanceUTC()).toInstant();
+  }
+
+  /**
+   * Converts a {@link ReadableDuration} into a Dataflow API duration string.
+   */
+  public static String toCloudDuration(ReadableDuration duration) {
+    // Note that since Joda objects use millisecond resolution, we always
+    // produce either no fractional seconds or fractional seconds with
+    // millisecond resolution.
+    long millis = duration.getMillis();
+    long seconds = millis / 1000;
+    millis = millis % 1000;
+    if (millis == 0) {
+      return String.format("%ds", seconds);
+    } else {
+      return String.format("%d.%03ds", seconds, millis);
+    }
+  }
+
+  /**
+   * Converts a Dataflow API duration string into a {@link Duration}.
+   * @return the parsed duration, or null if a parse error occurs
+   */
+  @Nullable
+  public static Duration fromCloudDuration(String duration) {
+    Matcher matcher = DURATION_PATTERN.matcher(duration);
+    if (!matcher.matches()) {
+      return null;
+    }
+    long millis = Long.valueOf(matcher.group(1)) * 1000;
+    String frac = matcher.group(2);
+    if (frac != null) {
+      long fracs = Long.valueOf(frac);
+      if (frac.length() == 3) {  // millisecond resolution
+        millis += fracs;
+      } else if (frac.length() == 6) {  // microsecond resolution
+        millis += fracs / 1000;
+      } else if (frac.length() == 9) {  // nanosecond resolution
+        millis += fracs / 1000000;
+      } else {
+        return null;
+      }
+    }
+    return Duration.millis(millis);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
new file mode 100644
index 0000000..b990212
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.RandomAccessData.RandomAccessDataCoder;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link RandomAccessData}.
+ */
+@RunWith(JUnit4.class)
+public class RandomAccessDataTest {
+  private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
+  private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
+  private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
+
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testCoder() throws Exception {
+    RandomAccessData streamA = new RandomAccessData();
+    streamA.asOutputStream().write(TEST_DATA_A);
+    RandomAccessData streamB = new RandomAccessData();
+    streamB.asOutputStream().write(TEST_DATA_A);
+    CoderProperties.coderDecodeEncodeEqual(RandomAccessDataCoder.of(), streamA);
+    CoderProperties.coderDeterministic(RandomAccessDataCoder.of(), streamA, streamB);
+    CoderProperties.coderConsistentWithEquals(RandomAccessDataCoder.of(), streamA, streamB);
+    CoderProperties.coderSerializable(RandomAccessDataCoder.of());
+    CoderProperties.structuralValueConsistentWithEquals(
+        RandomAccessDataCoder.of(), streamA, streamB);
+    assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
+    assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
+    assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
+    assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
+  }
+
+  @Test
+  public void testCoderWithPositiveInfinityIsError() throws Exception {
+    expectedException.expect(CoderException.class);
+    expectedException.expectMessage("Positive infinity can not be encoded");
+    RandomAccessDataCoder.of().encode(
+        RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
+  }
+
+  @Test
+  public void testLexicographicalComparator() throws Exception {
+    RandomAccessData streamA = new RandomAccessData();
+    streamA.asOutputStream().write(TEST_DATA_A);
+    RandomAccessData streamB = new RandomAccessData();
+    streamB.asOutputStream().write(TEST_DATA_B);
+    RandomAccessData streamC = new RandomAccessData();
+    streamC.asOutputStream().write(TEST_DATA_C);
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        streamA, streamB) < 0);
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        streamB, streamA) > 0);
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        streamB, streamB) == 0);
+    // Check common prefix length.
+    assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
+        streamB, streamC));
+    // Check that we honor the start offset.
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        streamB, streamC, 3) == 0);
+    // Test positive infinity comparisons.
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
+    assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+        RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
+  }
+
+  @Test
+  public void testEqualsAndHashCode() throws Exception {
+    // Test that equality by reference works
+    RandomAccessData streamA = new RandomAccessData();
+    streamA.asOutputStream().write(TEST_DATA_A);
+    assertEquals(streamA, streamA);
+    assertEquals(streamA.hashCode(), streamA.hashCode());
+
+    // Test different objects containing the same data are the same
+    RandomAccessData streamACopy = new RandomAccessData();
+    streamACopy.asOutputStream().write(TEST_DATA_A);
+    assertEquals(streamA, streamACopy);
+    assertEquals(streamA.hashCode(), streamACopy.hashCode());
+
+    // Test same length streams with different data differ
+    RandomAccessData streamB = new RandomAccessData();
+    streamB.asOutputStream().write(new byte[]{ 0x01, 0x02, 0x04 });
+    assertNotEquals(streamA, streamB);
+    assertNotEquals(streamA.hashCode(), streamB.hashCode());
+
+    // Test different length streams differ
+    streamB.asOutputStream().write(TEST_DATA_B);
+    assertNotEquals(streamA, streamB);
+    assertNotEquals(streamA.hashCode(), streamB.hashCode());
+  }
+
+  @Test
+  public void testResetTo() throws Exception {
+    RandomAccessData stream = new RandomAccessData();
+    stream.asOutputStream().write(TEST_DATA_A);
+    stream.resetTo(1);
+    assertEquals(1, stream.size());
+    stream.asOutputStream().write(TEST_DATA_A);
+    assertArrayEquals(new byte[]{ 0x01, 0x01, 0x02, 0x03 },
+        Arrays.copyOf(stream.array(), stream.size()));
+  }
+
+  @Test
+  public void testAsInputStream() throws Exception {
+    RandomAccessData stream = new RandomAccessData();
+    stream.asOutputStream().write(TEST_DATA_A);
+    InputStream in = stream.asInputStream(1, 1);
+    assertEquals(0x02, in.read());
+    assertEquals(-1, in.read());
+    in.close();
+  }
+
+  @Test
+  public void testReadFrom() throws Exception {
+    ByteArrayInputStream bais = new ByteArrayInputStream(TEST_DATA_A);
+    RandomAccessData stream = new RandomAccessData();
+    stream.readFrom(bais, 3, 2);
+    assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00, 0x01, 0x02 },
+        Arrays.copyOf(stream.array(), stream.size()));
+    bais.close();
+  }
+
+  @Test
+  public void testWriteTo() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    RandomAccessData stream = new RandomAccessData();
+    stream.asOutputStream().write(TEST_DATA_B);
+    stream.writeTo(baos, 1, 2);
+    assertArrayEquals(new byte[]{ 0x05, 0x04 }, baos.toByteArray());
+    baos.close();
+  }
+
+  @Test
+  public void testThatRandomAccessDataGrowsWhenResettingToPositionBeyondEnd() throws Exception {
+    RandomAccessData stream = new RandomAccessData(0);
+    assertArrayEquals(new byte[0], stream.array());
+    stream.resetTo(3);  // force resize
+    assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00 }, stream.array());
+  }
+
+  @Test
+  public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
+    RandomAccessData stream = new RandomAccessData(0);
+    assertArrayEquals(new byte[0], stream.array());
+    stream.readFrom(new ByteArrayInputStream(TEST_DATA_A), 0, TEST_DATA_A.length);
+    assertArrayEquals(TEST_DATA_A,
+        Arrays.copyOf(stream.array(), TEST_DATA_A.length));
+  }
+
+  @Test
+  public void testIncrement() throws Exception {
+    assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
+        new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
+    assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
+        new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
+
+    // Test for positive infinity
+    assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
+    assertSame(RandomAccessData.POSITIVE_INFINITY,
+        new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
+    assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
new file mode 100644
index 0000000..b318dee
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudDuration;
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.sdk.util.TimeUtil.toCloudDuration;
+import static org.apache.beam.sdk.util.TimeUtil.toCloudTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link TimeUtil}. */
+@RunWith(JUnit4.class)
+public final class TimeUtilTest {
+  @Test
+  public void toCloudTimeShouldPrintTimeStrings() {
+    assertEquals("1970-01-01T00:00:00Z", toCloudTime(new Instant(0)));
+    assertEquals("1970-01-01T00:00:00.001Z", toCloudTime(new Instant(1)));
+  }
+
+  @Test
+  public void fromCloudTimeShouldParseTimeStrings() {
+    assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00Z"));
+    assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001Z"));
+    assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000Z"));
+    assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
+    assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
+    assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
+    assertNull(fromCloudTime(""));
+    assertNull(fromCloudTime("1970-01-01T00:00:00"));
+  }
+
+  @Test
+  public void toCloudDurationShouldPrintDurationStrings() {
+    assertEquals("0s", toCloudDuration(Duration.ZERO));
+    assertEquals("4s", toCloudDuration(Duration.millis(4000)));
+    assertEquals("4.001s", toCloudDuration(Duration.millis(4001)));
+  }
+
+  @Test
+  public void fromCloudDurationShouldParseDurationStrings() {
+    assertEquals(Duration.millis(4000), fromCloudDuration("4s"));
+    assertEquals(Duration.millis(4001), fromCloudDuration("4.001s"));
+    assertEquals(Duration.millis(4001), fromCloudDuration("4.001000s"));
+    assertEquals(Duration.millis(4001), fromCloudDuration("4.001001s"));
+    assertEquals(Duration.millis(4001), fromCloudDuration("4.001000000s"));
+    assertEquals(Duration.millis(4001), fromCloudDuration("4.001000001s"));
+    assertNull(fromCloudDuration(""));
+    assertNull(fromCloudDuration("4"));
+    assertNull(fromCloudDuration("4.1"));
+    assertNull(fromCloudDuration("4.1s"));
+  }
+}