You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:32 UTC

[08/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
deleted file mode 100644
index 3a18336..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Utilities for working with JSON and other human-readable string formats.
- */
-public class StringUtils {
-  /**
-   * Converts the given array of bytes into a legal JSON string.
-   *
-   * <p>Uses a simple strategy of converting each byte to a single char,
-   * except for non-printable chars, non-ASCII chars, and '%', '\',
-   * and '"', which are encoded as three chars in '%xx' format, where
-   * 'xx' is the hexadecimal encoding of the byte.
-   */
-  public static String byteArrayToJsonString(byte[] bytes) {
-    StringBuilder sb = new StringBuilder(bytes.length * 2);
-    for (byte b : bytes) {
-      if (b >= 32 && b < 127) {
-        // A printable ascii character.
-        char c = (char) b;
-        if (c != '%' && c != '\\' && c != '\"') {
-          // Not an escape prefix or special character, either.
-          // Send through unchanged.
-          sb.append(c);
-          continue;
-        }
-      }
-      // Send through escaped.  Use '%xx' format.
-      sb.append(String.format("%%%02x", b));
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Converts the given string, encoded using {@link #byteArrayToJsonString},
-   * into a byte array.
-   *
-   * @throws IllegalArgumentException if the argument string is not legal
-   */
-  public static byte[] jsonStringToByteArray(String string) {
-    List<Byte> bytes = new ArrayList<>();
-    for (int i = 0; i < string.length(); ) {
-      char c = string.charAt(i);
-      Byte b;
-      if (c == '%') {
-        // Escaped.  Expect '%xx' format.
-        try {
-          b = (byte) Integer.parseInt(string.substring(i + 1, i + 3), 16);
-        } catch (IndexOutOfBoundsException | NumberFormatException exn) {
-          throw new IllegalArgumentException(
-              "not in legal encoded format; " +
-              "substring [" + i + ".." + (i + 2) + "] not in format \"%xx\"",
-              exn);
-        }
-        i += 3;
-      } else {
-        // Send through unchanged.
-        b = (byte) c;
-        i++;
-      }
-      bytes.add(b);
-    }
-    byte[] byteArray = new byte[bytes.size()];
-    int i = 0;
-    for (Byte b : bytes) {
-      byteArray[i++] = b;
-    }
-    return byteArray;
-  }
-
-  private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"DoFn", "Fn"};
-
-  /**
-   * Pattern to match a non-anonymous inner class.
-   * Eg, matches "Foo$Bar", or even "Foo$1$Bar", but not "Foo$1" or "Foo$1$2".
-   */
-  private static final Pattern NAMED_INNER_CLASS =
-      Pattern.compile(".+\\$(?<INNER>[^0-9].*)");
-
-  private static final String ANONYMOUS_CLASS_REGEX = "\\$[0-9]+\\$";
-
-  /**
-   * Returns a simple name for a class.
-   *
-   * <p>Note: this is non-invertible - the name may be simplified to an
-   * extent that it cannot be mapped back to the original class.
-   *
-   * <p>This can be used to generate human-readable names. It
-   * removes the package and outer classes from the name,
-   * and removes common suffixes.
-   *
-   * <p>Examples:
-   * <ul>
-   *   <li>{@code some.package.Word.SummaryDoFn} -> "Summary"
-   *   <li>{@code another.package.PairingFn} -> "Pairing"
-   * </ul>
-   *
-   * @throws IllegalArgumentException if the class is anonymous
-   */
-  public static String approximateSimpleName(Class<?> clazz) {
-    return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
-  }
-
-  /**
-   * Returns a name for a PTransform class.
-   *
-   * <p>This can be used to generate human-readable transform names. It
-   * removes the package from the name, and removes common suffixes.
-   *
-   * <p>It is different than approximateSimpleName:
-   * <ul>
-   *   <li>1. It keeps the outer classes names.
-   *   <li>2. It removes the common transform inner class: "Bound".
-   * </ul>
-   *
-   * <p>Examples:
-   * <ul>
-   *   <li>{@code some.package.Word.Summary} -> "Word.Summary"
-   *   <li>{@code another.package.Pairing.Bound} -> "Pairing"
-   * </ul>
-   */
-  public static String approximatePTransformName(Class<?> clazz) {
-    Preconditions.checkArgument(PTransform.class.isAssignableFrom(clazz));
-    return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
-        .replaceFirst("\\.Bound$", "");
-  }
-
-  /**
-   * Calculate the Levenshtein distance between two strings.
-   *
-   * <p>The Levenshtein distance between two words is the minimum number of single-character edits
-   * (i.e. insertions, deletions or substitutions) required to change one string into the other.
-   */
-  public static int getLevenshteinDistance(final String s, final String t) {
-    Preconditions.checkNotNull(s);
-    Preconditions.checkNotNull(t);
-
-    // base cases
-    if (s.equals(t)) {
-      return 0;
-    }
-    if (s.length() == 0) {
-      return t.length();
-    }
-    if (t.length() == 0) {
-      return s.length();
-    }
-
-    // create two work arrays to store integer distances
-    final int[] v0 = new int[t.length() + 1];
-    final int[] v1 = new int[t.length() + 1];
-
-    // initialize v0 (the previous row of distances)
-    // this row is A[0][i]: edit distance for an empty s
-    // the distance is just the number of characters to delete from t
-    for (int i = 0; i < v0.length; i++) {
-      v0[i] = i;
-    }
-
-    for (int i = 0; i < s.length(); i++) {
-      // calculate v1 (current row distances) from the previous row v0
-
-      // first element of v1 is A[i+1][0]
-      //   edit distance is delete (i+1) chars from s to match empty t
-      v1[0] = i + 1;
-
-      // use formula to fill in the rest of the row
-      for (int j = 0; j < t.length(); j++) {
-        int cost = (s.charAt(i) == t.charAt(j)) ? 0 : 1;
-        v1[j + 1] = Math.min(Math.min(v1[j] + 1, v0[j + 1] + 1), v0[j] + cost);
-      }
-
-      // copy v1 (current row) to v0 (previous row) for next iteration
-      System.arraycopy(v1, 0, v0, 0, v0.length);
-    }
-
-    return v1[t.length()];
-  }
-
-  private static String approximateSimpleName(Class<?> clazz, boolean dropOuterClassNames) {
-    Preconditions.checkArgument(!clazz.isAnonymousClass(),
-        "Attempted to get simple name of anonymous class");
-
-    String fullName = clazz.getName();
-    String shortName = fullName.substring(fullName.lastIndexOf('.') + 1);
-
-    // Drop common suffixes for each named component.
-    String[] names = shortName.split("\\$");
-    for (int i = 0; i < names.length; i++) {
-      names[i] = simplifyNameComponent(names[i]);
-    }
-    shortName = Joiner.on('$').join(names);
-
-    if (dropOuterClassNames) {
-      // Simplify inner class name by dropping outer class prefixes.
-      Matcher m = NAMED_INNER_CLASS.matcher(shortName);
-      if (m.matches()) {
-        shortName = m.group("INNER");
-      }
-    } else {
-      // Dropping anonymous outer classes
-      shortName = shortName.replaceAll(ANONYMOUS_CLASS_REGEX, ".");
-      shortName = shortName.replaceAll("\\$", ".");
-    }
-    return shortName;
-  }
-
-  private static String simplifyNameComponent(String name) {
-    for (String suffix : STANDARD_NAME_SUFFIXES) {
-      if (name.endsWith(suffix) && name.length() > suffix.length()) {
-        return name.substring(0, name.length() - suffix.length());
-      }
-    }
-    return name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
deleted file mode 100644
index c621c55..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.Data;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating datastructure representations
- * transferred via the Dataflow API.
- */
-public final class Structs {
-  private Structs() {}  // Non-instantiable
-
-  public static String getString(Map<String, Object> map, String name) throws Exception {
-    return getValue(map, name, String.class, "a string");
-  }
-
-  public static String getString(
-      Map<String, Object> map, String name, @Nullable String defaultValue)
-      throws Exception {
-    return getValue(map, name, String.class, "a string", defaultValue);
-  }
-
-  public static byte[] getBytes(Map<String, Object> map, String name) throws Exception {
-    @Nullable byte[] result = getBytes(map, name, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static byte[] getBytes(Map<String, Object> map, String name, @Nullable byte[] defaultValue)
-      throws Exception {
-    @Nullable String jsonString = getString(map, name, null);
-    if (jsonString == null) {
-      return defaultValue;
-    }
-    // TODO: Need to agree on a format for encoding bytes in
-    // a string that can be sent to the backend, over the cloud
-    // map task work API.  base64 encoding seems pretty common.  Switch to it?
-    return StringUtils.jsonStringToByteArray(jsonString);
-  }
-
-  public static Boolean getBoolean(Map<String, Object> map, String name) throws Exception {
-    return getValue(map, name, Boolean.class, "a boolean");
-  }
-
-  @Nullable
-  public static Boolean getBoolean(
-      Map<String, Object> map, String name, @Nullable Boolean defaultValue)
-      throws Exception {
-    return getValue(map, name, Boolean.class, "a boolean", defaultValue);
-  }
-
-  public static Long getLong(Map<String, Object> map, String name) throws Exception {
-    return getValue(map, name, Long.class, "a long");
-  }
-
-  @Nullable
-  public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue)
-      throws Exception {
-    return getValue(map, name, Long.class, "a long", defaultValue);
-  }
-
-  public static Integer getInt(Map<String, Object> map, String name) throws Exception {
-    return getValue(map, name, Integer.class, "an int");
-  }
-
-  @Nullable
-  public static Integer getInt(Map<String, Object> map, String name, @Nullable Integer defaultValue)
-      throws Exception {
-    return getValue(map, name, Integer.class, "an int", defaultValue);
-  }
-
-  @Nullable
-  public static List<String> getStrings(
-      Map<String, Object> map, String name, @Nullable List<String> defaultValue)
-      throws Exception {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a string or a list");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a list of strings,
-      // this is an empty list.
-      return Collections.<String>emptyList();
-    }
-    @Nullable String singletonString = decodeValue(value, String.class);
-    if (singletonString != null) {
-      return Collections.singletonList(singletonString);
-    }
-    if (!(value instanceof List)) {
-      throw new IncorrectTypeException(name, map, "a string or a list");
-    }
-    @SuppressWarnings("unchecked")
-    List<Object> elements = (List<Object>) value;
-    List<String> result = new ArrayList<>(elements.size());
-    for (Object o : elements) {
-      @Nullable String s = decodeValue(o, String.class);
-      if (s == null) {
-        throw new IncorrectTypeException(name, map, "a list of strings");
-      }
-      result.add(s);
-    }
-    return result;
-  }
-
-  public static Map<String, Object> getObject(Map<String, Object> map, String name)
-      throws Exception {
-    @Nullable Map<String, Object> result = getObject(map, name, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static Map<String, Object> getObject(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
-      throws Exception {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "an object");
-      }
-      return defaultValue;
-    }
-    return checkObject(value, map, name);
-  }
-
-  private static Map<String, Object> checkObject(
-      Object value, Map<String, Object> map, String name) throws Exception {
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as an object, this is an
-      // empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "an object (not a map)");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapValue = (Map<String, Object>) value;
-    if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
-      throw new IncorrectTypeException(name, map,
-          "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
-    }
-    return mapValue;
-  }
-
-  @Nullable
-  public static List<Map<String, Object>> getListOfMaps(Map<String, Object> map, String name,
-      @Nullable List<Map<String, Object>> defaultValue) throws Exception {
-    @Nullable
-    Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a list");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a list,
-      // this is an empty list.
-      return Collections.<Map<String, Object>>emptyList();
-    }
-
-    if (!(value instanceof List)) {
-      throw new IncorrectTypeException(name, map, "a list");
-    }
-
-    List<?> elements = (List<?>) value;
-    for (Object elem : elements) {
-      if (!(elem instanceof Map)) {
-        throw new IncorrectTypeException(name, map, "a list of Map objects");
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
-    return result;
-  }
-
-  public static Map<String, Object> getDictionary(
-      Map<String, Object> map, String name) throws Exception {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a dictionary, this is
-      // an empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "a dictionary");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> result = (Map<String, Object>) value;
-    return result;
-  }
-
-  @Nullable
-  public static Map<String, Object> getDictionary(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
-      throws Exception {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a dictionary");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a dictionary, this is
-      // an empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "a dictionary");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> result = (Map<String, Object>) value;
-    return result;
-  }
-
-  // Builder operations.
-
-  public static void addString(Map<String, Object> map, String name, String value) {
-    addObject(map, name, CloudObject.forString(value));
-  }
-
-  public static void addBoolean(Map<String, Object> map, String name, boolean value) {
-    addObject(map, name, CloudObject.forBoolean(value));
-  }
-
-  public static void addLong(Map<String, Object> map, String name, long value) {
-    addObject(map, name, CloudObject.forInteger(value));
-  }
-
-  public static void addObject(
-      Map<String, Object> map, String name, Map<String, Object> value) {
-    map.put(name, value);
-  }
-
-  public static void addNull(Map<String, Object> map, String name) {
-    map.put(name, Data.nullOf(Object.class));
-  }
-
-  public static void addLongs(Map<String, Object> map, String name, long... longs) {
-    List<Map<String, Object>> elements = new ArrayList<>(longs.length);
-    for (Long value : longs) {
-      elements.add(CloudObject.forInteger(value));
-    }
-    map.put(name, elements);
-  }
-
-  public static void addList(
-      Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
-    map.put(name, elements);
-  }
-
-  public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
-    ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
-    for (String element : elements) {
-      objects.add(CloudObject.forString(element));
-    }
-    addList(map, name, objects);
-  }
-
-  public static <T extends Map<String, Object>> void addList(
-      Map<String, Object> map, String name, T[] elements) {
-    map.put(name, Arrays.asList(elements));
-  }
-
-  public static void addDictionary(
-      Map<String, Object> map, String name, Map<String, Object> value) {
-    map.put(name, value);
-  }
-
-  public static void addDouble(Map<String, Object> map, String name, Double value) {
-    addObject(map, name, CloudObject.forFloat(value));
-  }
-
-  // Helper methods for a few of the accessor methods.
-
-  private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type)
-      throws Exception {
-    @Nullable T result = getValue(map, name, clazz, type, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  private static <T> T getValue(
-      Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue)
-      throws Exception {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, type);
-      }
-      return defaultValue;
-    }
-    T result = decodeValue(value, clazz);
-    if (result == null) {
-      // The value exists, but can't be decoded.
-      throw new IncorrectTypeException(name, map, type);
-    }
-    return result;
-  }
-
-  @Nullable
-  private static <T> T decodeValue(Object value, Class<T> clazz) {
-    try {
-      if (value.getClass() == clazz) {
-        // decodeValue() is only called for final classes; if the class matches,
-        // it's safe to just return the value, and if it doesn't match, decoding
-        // is needed.
-        return clazz.cast(value);
-      }
-      if (!(value instanceof Map)) {
-        return null;
-      }
-      @SuppressWarnings("unchecked")
-      Map<String, Object> map = (Map<String, Object>) value;
-      @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
-      if (typeName == null) {
-        return null;
-      }
-      @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
-      if (knownType == null) {
-        return null;
-      }
-      @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
-      if (scalar == null) {
-        return null;
-      }
-      return knownType.parse(scalar, clazz);
-    } catch (ClassCastException e) {
-      // If any class cast fails during decoding, the value's not decodable.
-      return null;
-    }
-  }
-
-  private static final class ParameterNotFoundException extends Exception {
-    public ParameterNotFoundException(String name, Map<String, Object> map) {
-      super("didn't find required parameter " + name + " in " + map);
-    }
-  }
-
-  private static final class IncorrectTypeException extends Exception {
-    public IncorrectTypeException(String name, Map<String, Object> map, String type) {
-      super("required parameter " + name + " in " + map + " not " + type);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
deleted file mode 100644
index 3255ede..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to mark {@link DoFn DoFns} as an internal component of the Dataflow SDK.
- *
- * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated
- * {@code DoFn} as a system counter (as opposed to a user counter).
- *
- * <p>This is internal to the Dataflow SDK.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface SystemDoFnInternal {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
deleted file mode 100644
index 1665792..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.BagState;
-import com.google.cloud.dataflow.sdk.util.state.CombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-
-/**
- * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends ReduceFn<K, InputT, OutputT, W> {
-  private static final String BUFFER_NAME = "buf";
-
-  /**
-   * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the
-   * input values in persistent state and produces an {@code Iterable<T>}.
-   */
-  public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
-      buffering(final Coder<T> inputCoder) {
-    final StateTag<Object, BagState<T>> bufferTag =
-        StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
-    return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
-      @Override
-      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
-        StateMerging.prefetchBags(state, bufferTag);
-      }
-
-      @Override
-      public void onMerge(OnMergeContext c) throws Exception {
-        StateMerging.mergeBags(c.state(), bufferTag);
-      }
-    };
-  }
-
-  /**
-   * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input
-   * values using a {@link CombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT,
-      AccumT, OutputT, W>
-      combining(
-          final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
-    if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
-      bufferTag = StateTags.makeSystemTagInternal(
-          StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
-              BUFFER_NAME, combineFn.getAccumulatorCoder(),
-              (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-
-    } else {
-      bufferTag = StateTags.makeSystemTagInternal(
-            StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
-                BUFFER_NAME, combineFn.getAccumulatorCoder(),
-                (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-    }
-    return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
-      @Override
-      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
-        StateMerging.prefetchCombiningValues(state, bufferTag);
-      }
-
-      @Override
-      public void onMerge(OnMergeContext c) throws Exception {
-        StateMerging.mergeCombiningValues(c.state(), bufferTag);
-      }
-    };
-  }
-
-  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
-
-  public SystemReduceFn(
-      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
-    this.bufferTag = bufferTag;
-  }
-
-  @Override
-  public void processValue(ProcessValueContext c) throws Exception {
-    c.state().access(bufferTag).add(c.value());
-  }
-
-  @Override
-  public void prefetchOnTrigger(StateAccessor<K> state) {
-    state.access(bufferTag).readLater();
-  }
-
-  @Override
-  public void onTrigger(OnTriggerContext c) throws Exception {
-    c.output(c.state().access(bufferTag).read());
-  }
-
-  @Override
-  public void clearState(Context c) throws Exception {
-    c.state().access(bufferTag).clear();
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-    return state.access(bufferTag).isEmpty();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
deleted file mode 100644
index 359e157..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.BearerToken;
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.auth.oauth2.TokenResponse;
-import com.google.api.client.testing.http.MockHttpTransport;
-
-import java.io.IOException;
-
-/**
- * Fake credential, for use in testing.
- */
-public class TestCredential extends Credential {
-
-  private final String token;
-
-  public TestCredential() {
-    this("NULL");
-  }
-
-  public TestCredential(String token) {
-    super(new Builder(
-        BearerToken.authorizationHeaderAccessMethod())
-        .setTransport(new MockHttpTransport()));
-    this.token = token;
-  }
-
-  @Override
-  protected TokenResponse executeRefreshToken() throws IOException {
-    TokenResponse response = new TokenResponse();
-    response.setExpiresInSeconds(5L * 60);
-    response.setAccessToken(token);
-    return response;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
deleted file mode 100644
index 4ff36f7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * {@code TimeDomain} specifies whether an operation is based on
- * timestamps of elements or current "real-world" time as reported while processing.
- */
-public enum TimeDomain {
-  /**
-   * The {@code EVENT_TIME} domain corresponds to the timestamps on the elements. Time advances
-   * on the system watermark advances.
-   */
-  EVENT_TIME,
-
-  /**
-   * The {@code PROCESSING_TIME} domain corresponds to the current to the current (system) time.
-   * This is advanced during execution of the Dataflow pipeline.
-   */
-  PROCESSING_TIME,
-
-  /**
-   * Same as the {@code PROCESSING_TIME} domain, except it won't fire a timer set for time
-   * {@code T} until all timers from earlier stages set for a time earlier than {@code T} have
-   * fired.
-   */
-  SYNCHRONIZED_PROCESSING_TIME;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
deleted file mode 100644
index 93195a7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
-import org.joda.time.ReadableInstant;
-import org.joda.time.chrono.ISOChronology;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for converting between Dataflow API and SDK time
- * representations.
- *
- * <p>Dataflow API times are strings of the form
- * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
- * strings with optional fractional seconds and a 'Z' offset.
- *
- * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
- * that is, seconds with optional fractional seconds and a literal 's' at the end.
- *
- * <p>In both formats, fractional seconds are either three digits (millisecond
- * resolution), six digits (microsecond resolution), or nine digits (nanosecond
- * resolution).
- */
-public final class TimeUtil {
-  private TimeUtil() {}  // Non-instantiable.
-
-  private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
-  private static final Pattern TIME_PATTERN =
-      Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
-
-  /**
-   * Converts a {@link ReadableInstant} into a Dateflow API time value.
-   */
-  public static String toCloudTime(ReadableInstant instant) {
-    // Note that since Joda objects use millisecond resolution, we always
-    // produce either no fractional seconds or fractional seconds with
-    // millisecond resolution.
-
-    // Translate the ReadableInstant to a DateTime with ISOChronology.
-    DateTime time = new DateTime(instant);
-
-    int millis = time.getMillisOfSecond();
-    if (millis == 0) {
-      return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
-          time.getYear(),
-          time.getMonthOfYear(),
-          time.getDayOfMonth(),
-          time.getHourOfDay(),
-          time.getMinuteOfHour(),
-          time.getSecondOfMinute());
-    } else {
-      return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
-          time.getYear(),
-          time.getMonthOfYear(),
-          time.getDayOfMonth(),
-          time.getHourOfDay(),
-          time.getMinuteOfHour(),
-          time.getSecondOfMinute(),
-          millis);
-    }
-  }
-
-  /**
-   * Converts a time value received via the Dataflow API into the corresponding
-   * {@link Instant}.
-   * @return the parsed time, or null if a parse error occurs
-   */
-  @Nullable
-  public static Instant fromCloudTime(String time) {
-    Matcher matcher = TIME_PATTERN.matcher(time);
-    if (!matcher.matches()) {
-      return null;
-    }
-    int year = Integer.valueOf(matcher.group(1));
-    int month = Integer.valueOf(matcher.group(2));
-    int day = Integer.valueOf(matcher.group(3));
-    int hour = Integer.valueOf(matcher.group(4));
-    int minute = Integer.valueOf(matcher.group(5));
-    int second = Integer.valueOf(matcher.group(6));
-    int millis = 0;
-
-    String frac = matcher.group(7);
-    if (frac != null) {
-      int fracs = Integer.valueOf(frac);
-      if (frac.length() == 3) {  // millisecond resolution
-        millis = fracs;
-      } else if (frac.length() == 6) {  // microsecond resolution
-        millis = fracs / 1000;
-      } else if (frac.length() == 9) {  // nanosecond resolution
-        millis = fracs / 1000000;
-      } else {
-        return null;
-      }
-    }
-
-    return new DateTime(year, month, day, hour, minute, second, millis,
-        ISOChronology.getInstanceUTC()).toInstant();
-  }
-
-  /**
-   * Converts a {@link ReadableDuration} into a Dataflow API duration string.
-   */
-  public static String toCloudDuration(ReadableDuration duration) {
-    // Note that since Joda objects use millisecond resolution, we always
-    // produce either no fractional seconds or fractional seconds with
-    // millisecond resolution.
-    long millis = duration.getMillis();
-    long seconds = millis / 1000;
-    millis = millis % 1000;
-    if (millis == 0) {
-      return String.format("%ds", seconds);
-    } else {
-      return String.format("%d.%03ds", seconds, millis);
-    }
-  }
-
-  /**
-   * Converts a Dataflow API duration string into a {@link Duration}.
-   * @return the parsed duration, or null if a parse error occurs
-   */
-  @Nullable
-  public static Duration fromCloudDuration(String duration) {
-    Matcher matcher = DURATION_PATTERN.matcher(duration);
-    if (!matcher.matches()) {
-      return null;
-    }
-    long millis = Long.valueOf(matcher.group(1)) * 1000;
-    String frac = matcher.group(2);
-    if (frac != null) {
-      long fracs = Long.valueOf(frac);
-      if (frac.length() == 3) {  // millisecond resolution
-        millis += fracs;
-      } else if (frac.length() == 6) {  // microsecond resolution
-        millis += fracs / 1000;
-      } else if (frac.length() == 9) {  // nanosecond resolution
-        millis += fracs / 1000000;
-      } else {
-        return null;
-      }
-    }
-    return Duration.millis(millis);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
deleted file mode 100644
index c823ed3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.InstantCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * Encapsulate interaction with time within the execution environment.
- *
- * <p>This class allows setting and deleting timers, and also retrieving an
- * estimate of the current time.
- */
-public interface TimerInternals {
-
-  /**
-   * Writes out a timer to be fired when the watermark reaches the given
-   * timestamp.
-   *
-   * <p>The combination of {@code namespace}, {@code timestamp} and {@code domain} uniquely
-   * identify a timer. Multiple timers set for the same parameters can be safely deduplicated.
-   */
-  void setTimer(TimerData timerKey);
-
-  /**
-   * Deletes the given timer.
-   */
-  void deleteTimer(TimerData timerKey);
-
-  /**
-   * Returns the current timestamp in the {@link TimeDomain#PROCESSING_TIME} time domain.
-   */
-  Instant currentProcessingTime();
-
-  /**
-   * Returns the current timestamp in the {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time
-   * domain or {@code null} if unknown.
-   */
-  @Nullable
-  Instant currentSynchronizedProcessingTime();
-
-  /**
-   * Return the current, local input watermark timestamp for this computation
-   * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
-   *
-   * <p>This value:
-   * <ol>
-   * <li>Is monotonically increasing.
-   * <li>May differ between workers due to network and other delays.
-   * <li>Will never be ahead of the global input watermark for this computation. But it
-   * may be arbitrarily behind the global input watermark.
-   * <li>Any element with a timestamp before the local input watermark can be considered
-   * 'locally late' and be subject to special processing or be dropped entirely.
-   * </ol>
-   *
-   * <p>Note that because the local input watermark can be behind the global input watermark,
-   * it is possible for an element to be considered locally on-time even though it is
-   * globally late.
-   */
-  @Nullable
-  Instant currentInputWatermarkTime();
-
-  /**
-   * Return the current, local output watermark timestamp for this computation
-   * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
-   *
-   * <p>This value:
-   * <ol>
-   * <li>Is monotonically increasing.
-   * <li>Will never be ahead of {@link #currentInputWatermarkTime} as returned above.
-   * <li>May differ between workers due to network and other delays.
-   * <li>However will never be behind the global input watermark for any following computation.
-   * </ol>
-   *
-   * <p> In pictures:
-   * <pre>
-   *  |              |       |       |       |
-   *  |              |   D   |   C   |   B   |   A
-   *  |              |       |       |       |
-   * GIWM     <=    GOWM <= LOWM <= LIWM <= GIWM
-   * (next stage)
-   * -------------------------------------------------> event time
-   * </pre>
-   * where
-   * <ul>
-   * <li> LOWM = local output water mark.
-   * <li> GOWM = global output water mark.
-   * <li> GIWM = global input water mark.
-   * <li> LIWM = local input water mark.
-   * <li> A = A globally on-time element.
-   * <li> B = A globally late, but locally on-time element.
-   * <li> C = A locally late element which may still contribute to the timestamp of a pane.
-   * <li> D = A locally late element which cannot contribute to the timestamp of a pane.
-   * </ul>
-   *
-   * <p>Note that if a computation emits an element which is not before the current output watermark
-   * then that element will always appear locally on-time in all following computations. However,
-   * it is possible for an element emitted before the current output watermark to appear locally
-   * on-time in a following computation. Thus we must be careful to never assume locally late data
-   * viewed on the output of a computation remains locally late on the input of a following
-   * computation.
-   */
-  @Nullable
-  Instant currentOutputWatermarkTime();
-
-  /**
-   * Data about a timer as represented within {@link TimerInternals}.
-   */
-  public static class TimerData implements Comparable<TimerData> {
-    private final StateNamespace namespace;
-    private final Instant timestamp;
-    private final TimeDomain domain;
-
-    private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      this.namespace = checkNotNull(namespace);
-      this.timestamp = checkNotNull(timestamp);
-      this.domain = checkNotNull(domain);
-    }
-
-    public StateNamespace getNamespace() {
-      return namespace;
-    }
-
-    public Instant getTimestamp() {
-      return timestamp;
-    }
-
-    public TimeDomain getDomain() {
-      return domain;
-    }
-
-    /**
-     * Construct the {@code TimerKey} for the given parameters.
-     */
-    public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      return new TimerData(namespace, timestamp, domain);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-
-      if (!(obj instanceof TimerData)) {
-        return false;
-      }
-
-      TimerData that = (TimerData) obj;
-      return Objects.equals(this.domain, that.domain)
-          && this.timestamp.isEqual(that.timestamp)
-          && Objects.equals(this.namespace, that.namespace);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(domain, timestamp, namespace);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("namespace", namespace)
-          .add("timestamp", timestamp)
-          .add("domain", domain)
-          .toString();
-    }
-
-    @Override
-    public int compareTo(TimerData o) {
-      return Long.compare(timestamp.getMillis(), o.getTimestamp().getMillis());
-    }
-  }
-
-  /**
-   * A {@link Coder} for {@link TimerData}.
-   */
-  public class TimerDataCoder extends StandardCoder<TimerData> {
-    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
-    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
-    private final Coder<? extends BoundedWindow> windowCoder;
-
-    public static TimerDataCoder of(Coder<? extends BoundedWindow> windowCoder) {
-      return new TimerDataCoder(windowCoder);
-    }
-
-    @SuppressWarnings("unchecked")
-    @JsonCreator
-    public static TimerDataCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      Preconditions.checkArgument(components.size() == 1,
-          "Expecting 1 components, got " + components.size());
-      return of((Coder<? extends BoundedWindow>) components.get(0));
-    }
-
-    private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
-      this.windowCoder = windowCoder;
-    }
-
-    @Override
-    public void encode(TimerData timer, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      STRING_CODER.encode(timer.namespace.stringKey(), outStream, nestedContext);
-      INSTANT_CODER.encode(timer.timestamp, outStream, nestedContext);
-      STRING_CODER.encode(timer.domain.name(), outStream, nestedContext);
-    }
-
-    @Override
-    public TimerData decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      StateNamespace namespace =
-          StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
-      Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
-      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
-      return TimerData.of(namespace, timestamp, domain);
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.asList(windowCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("window coder must be deterministic", windowCoder);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
deleted file mode 100644
index 7d4b4f2..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * Interface for interacting with time.
- */
-@Experimental(Experimental.Kind.TIMERS)
-public interface Timers {
-  /**
-   * Sets a timer to fire when the event time watermark, the current processing time, or
-   * the synchronized processing time watermark surpasses a given timestamp.
-   *
-   * <p>See {@link TimeDomain} for details on the time domains available.
-   *
-   * <p>Timers are not guaranteed to fire immediately, but will be delivered at some time
-   * afterwards.
-   *
-   * <p>An implementation of {@link Timers} implicitly scopes timers that are set - they may
-   * be scoped to a key and window, or a key, window, and trigger, etc.
-   *
-   * @param timestamp the time at which the timer should be delivered
-   * @param timeDomain the domain that the {@code timestamp} applies to
-   */
-  public abstract void setTimer(Instant timestamp, TimeDomain timeDomain);
-
-  /** Removes the timer set in this context for the {@code timestmap} and {@code timeDomain}. */
-  public abstract void deleteTimer(Instant timestamp, TimeDomain timeDomain);
-
-  /** Returns the current processing time. */
-  public abstract Instant currentProcessingTime();
-
-  /** Returns the current synchronized processing time or {@code null} if unknown. */
-  @Nullable
-  public abstract Instant currentSynchronizedProcessingTime();
-
-  /** Returns the current event time or {@code null} if unknown. */
-  @Nullable
-  public abstract Instant currentEventTime();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
deleted file mode 100644
index 15fe286..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.storage.Storage;
-import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-
-/**
- * Helpers for cloud communication.
- */
-public class Transport {
-
-  private static class SingletonHelper {
-    /** Global instance of the JSON factory. */
-    private static final JsonFactory JSON_FACTORY;
-
-    /** Global instance of the HTTP transport. */
-    private static final HttpTransport HTTP_TRANSPORT;
-
-    static {
-      try {
-        JSON_FACTORY = JacksonFactory.getDefaultInstance();
-        HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
-      } catch (GeneralSecurityException | IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  public static HttpTransport getTransport() {
-    return SingletonHelper.HTTP_TRANSPORT;
-  }
-
-  public static JsonFactory getJsonFactory() {
-    return SingletonHelper.JSON_FACTORY;
-  }
-
-  private static class ApiComponents {
-    public String rootUrl;
-    public String servicePath;
-
-    public ApiComponents(String root, String path) {
-      this.rootUrl = root;
-      this.servicePath = path;
-    }
-  }
-
-  private static ApiComponents apiComponentsFromUrl(String urlString) {
-    try {
-      URL url = new URL(urlString);
-      String rootUrl = url.getProtocol() + "://" + url.getHost() +
-          (url.getPort() > 0 ? ":" + url.getPort() : "");
-      return new ApiComponents(rootUrl, url.getPath());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Invalid URL: " + urlString);
-    }
-  }
-
-  /**
-   * Returns a BigQuery client builder.
-   *
-   * <p>Note: this client's endpoint is <b>not</b> modified by the
-   * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
-   */
-  public static Bigquery.Builder
-      newBigQueryClient(BigQueryOptions options) {
-    return new Bigquery.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Pubsub client builder.
-   *
-   * <p>Note: this client's endpoint is <b>not</b> modified by the
-   * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
-   */
-  public static Pubsub.Builder
-      newPubsubClient(DataflowPipelineOptions options) {
-    return new Pubsub.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setRootUrl(options.getPubsubRootUrl())
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Google Cloud Dataflow client builder.
-   */
-  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
-    String servicePath = options.getDataflowEndpoint();
-    ApiComponents components;
-    if (servicePath.contains("://")) {
-      components = apiComponentsFromUrl(servicePath);
-    } else {
-      components = new ApiComponents(options.getApiRootUrl(), servicePath);
-    }
-
-    return new Dataflow.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setRootUrl(components.rootUrl)
-        .setServicePath(components.servicePath)
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
-    return new Clouddebugger.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Dataflow client that does not automatically retry failed
-   * requests.
-   */
-  public static Dataflow.Builder
-      newRawDataflowClient(DataflowPipelineOptions options) {
-    return newDataflowClient(options)
-        .setHttpRequestInitializer(options.getGcpCredential())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Cloud Storage client builder.
-   *
-   * <p>Note: this client's endpoint is <b>not</b> modified by the
-   * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
-   */
-  public static Storage.Builder
-      newStorageClient(GcsOptions options) {
-    String servicePath = options.getGcsEndpoint();
-    Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log the code 404. Code up the stack will deal with 404's if needed, and
-            // logging it by default clutters the output during file staging.
-            new RetryHttpRequestInitializer(
-                ImmutableList.of(404), new UploadIdResponseInterceptor())))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-    if (servicePath != null) {
-      ApiComponents components = apiComponentsFromUrl(servicePath);
-      storageBuilder.setRootUrl(components.rootUrl);
-      storageBuilder.setServicePath(components.servicePath);
-    }
-    return storageBuilder;
-  }
-
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credential credential, HttpRequestInitializer httpRequestInitializer) {
-    if (credential == null) {
-      return httpRequestInitializer;
-    } else {
-      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
deleted file mode 100644
index 64ff402..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.TriggerInfo;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.State;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link Trigger} contexts.
- *
- * <p>These contexts are highly interdependent and share many fields; it is inadvisable
- * to create them via any means other than this factory class.
- */
-public class TriggerContextFactory<W extends BoundedWindow> {
-
-  private final WindowingStrategy<?, W> windowingStrategy;
-  private StateInternals<?> stateInternals;
-  // Future triggers may be able to exploit the active window to state address window mapping.
-  @SuppressWarnings("unused")
-  private ActiveWindowSet<W> activeWindows;
-  private final Coder<W> windowCoder;
-
-  public TriggerContextFactory(WindowingStrategy<?, W> windowingStrategy,
-      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
-    this.windowingStrategy = windowingStrategy;
-    this.stateInternals = stateInternals;
-    this.activeWindows = activeWindows;
-    this.windowCoder = windowingStrategy.getWindowFn().windowCoder();
-  }
-
-  public Trigger<W>.TriggerContext base(W window, Timers timers,
-      ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet) {
-    return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
-  }
-
-  public Trigger<W>.OnElementContext createOnElementContext(
-      W window, Timers timers, Instant elementTimestamp,
-      ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet) {
-    return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
-  }
-
-  public Trigger<W>.OnMergeContext createOnMergeContext(W window, Timers timers,
-      ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet,
-      Map<W, FinishedTriggers> finishedSets) {
-    return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
-  }
-
-  public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger<W> trigger) {
-    return new StateAccessorImpl(window, trigger);
-  }
-
-  public MergingStateAccessor<?, W> createMergingStateAccessor(
-      W mergeResult, Collection<W> mergingWindows, ExecutableTrigger<W> trigger) {
-    return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
-  }
-
-  private class TriggerInfoImpl implements Trigger.TriggerInfo<W> {
-
-    protected final ExecutableTrigger<W> trigger;
-    protected final FinishedTriggers finishedSet;
-    private final Trigger<W>.TriggerContext context;
-
-    public TriggerInfoImpl(ExecutableTrigger<W> trigger, FinishedTriggers finishedSet,
-        Trigger<W>.TriggerContext context) {
-      this.trigger = trigger;
-      this.finishedSet = finishedSet;
-      this.context = context;
-    }
-
-    @Override
-    public boolean isMerging() {
-      return !windowingStrategy.getWindowFn().isNonMerging();
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger<W>> subTriggers() {
-      return trigger.subTriggers();
-    }
-
-    @Override
-    public ExecutableTrigger<W> subTrigger(int subtriggerIndex) {
-      return trigger.subTriggers().get(subtriggerIndex);
-    }
-
-    @Override
-    public boolean isFinished() {
-      return finishedSet.isFinished(trigger);
-    }
-
-    @Override
-    public boolean isFinished(int subtriggerIndex) {
-      return finishedSet.isFinished(subTrigger(subtriggerIndex));
-    }
-
-    @Override
-    public boolean areAllSubtriggersFinished() {
-      return Iterables.isEmpty(unfinishedSubTriggers());
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger<W>> unfinishedSubTriggers() {
-      return FluentIterable
-          .from(trigger.subTriggers())
-          .filter(new Predicate<ExecutableTrigger<W>>() {
-            @Override
-            public boolean apply(ExecutableTrigger<W> trigger) {
-              return !finishedSet.isFinished(trigger);
-            }
-          });
-    }
-
-    @Override
-    public ExecutableTrigger<W> firstUnfinishedSubTrigger() {
-      for (ExecutableTrigger<W> subTrigger : trigger.subTriggers()) {
-        if (!finishedSet.isFinished(subTrigger)) {
-          return subTrigger;
-        }
-      }
-      return null;
-    }
-
-    @Override
-    public void resetTree() throws Exception {
-      finishedSet.clearRecursively(trigger);
-      trigger.invokeClear(context);
-    }
-
-    @Override
-    public void setFinished(boolean finished) {
-      finishedSet.setFinished(trigger, finished);
-    }
-
-    @Override
-    public void setFinished(boolean finished, int subTriggerIndex) {
-      finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
-    }
-  }
-
-  private class TriggerTimers implements Timers {
-
-    private final Timers timers;
-    private final W window;
-
-    public TriggerTimers(W window, Timers timers) {
-      this.timers = timers;
-      this.window = window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timers.setTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      if (timeDomain == TimeDomain.EVENT_TIME
-          && timestamp.equals(window.maxTimestamp())) {
-        // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
-        // state transitions.
-        return;
-      }
-      timers.deleteTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class MergingTriggerInfoImpl
-      extends TriggerInfoImpl implements Trigger.MergingTriggerInfo<W> {
-
-    private final Map<W, FinishedTriggers> finishedSets;
-
-    public MergingTriggerInfoImpl(
-        ExecutableTrigger<W> trigger,
-        FinishedTriggers finishedSet,
-        Trigger<W>.TriggerContext context,
-        Map<W, FinishedTriggers> finishedSets) {
-      super(trigger, finishedSet, context);
-      this.finishedSets = finishedSets;
-    }
-
-    @Override
-    public boolean finishedInAnyMergingWindow() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (finishedSet.isFinished(trigger)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean finishedInAllMergingWindows() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (!finishedSet.isFinished(trigger)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public Iterable<W> getFinishedMergingWindows() {
-      return Maps.filterValues(finishedSets, new Predicate<FinishedTriggers>() {
-        @Override
-        public boolean apply(FinishedTriggers finishedSet) {
-          return finishedSet.isFinished(trigger);
-        }
-      }).keySet();
-    }
-  }
-
-  private class StateAccessorImpl implements StateAccessor<Object> {
-    protected final int triggerIndex;
-    protected final StateNamespace windowNamespace;
-
-    public StateAccessorImpl(
-        W window,
-        ExecutableTrigger<W> trigger) {
-      this.triggerIndex = trigger.getTriggerIndex();
-      this.windowNamespace = namespaceFor(window);
-    }
-
-    protected StateNamespace namespaceFor(W window) {
-      return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-  }
-
-  private class MergingStateAccessorImpl extends StateAccessorImpl
-  implements MergingStateAccessor<Object, W> {
-    private final Collection<W> activeToBeMerged;
-
-    public MergingStateAccessorImpl(ExecutableTrigger<W> trigger, Collection<W> activeToBeMerged,
-        W mergeResult) {
-      super(mergeResult, trigger);
-      this.activeToBeMerged = activeToBeMerged;
-    }
-
-    @Override
-    public <StateT extends State> StateT access(
-        StateTag<? super Object, StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super Object, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W mergingWindow : activeToBeMerged) {
-        StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
-        builder.put(mergingWindow, stateForWindow);
-      }
-      return builder.build();
-    }
-  }
-
-  private class TriggerContextImpl extends Trigger<W>.TriggerContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-
-    private TriggerContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger<W> trigger,
-        FinishedTriggers finishedSet) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-    }
-
-    @Override
-    public Trigger<W>.TriggerContext forTrigger(ExecutableTrigger<W> trigger) {
-      return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
-    }
-
-    @Override
-    public TriggerInfo<W> trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnElementContextImpl extends Trigger<W>.OnElementContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-    private final Instant eventTimestamp;
-
-    private OnElementContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger<W> trigger,
-        FinishedTriggers finishedSet,
-        Instant eventTimestamp) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-      this.eventTimestamp = eventTimestamp;
-    }
-
-
-    @Override
-    public Instant eventTimestamp() {
-      return eventTimestamp;
-    }
-
-    @Override
-    public Trigger<W>.OnElementContext forTrigger(ExecutableTrigger<W> trigger) {
-      return new OnElementContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
-    }
-
-    @Override
-    public TriggerInfo<W> trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnMergeContextImpl extends Trigger<W>.OnMergeContext {
-    private final MergingStateAccessor<?, W> state;
-    private final W window;
-    private final Collection<W> mergingWindows;
-    private final Timers timers;
-    private final MergingTriggerInfoImpl triggerInfo;
-
-    private OnMergeContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger<W> trigger,
-        FinishedTriggers finishedSet,
-        Map<W, FinishedTriggers> finishedSets) {
-      trigger.getSpec().super();
-      this.mergingWindows = finishedSets.keySet();
-      this.window = window;
-      this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
-    }
-
-    @Override
-    public Trigger<W>.OnMergeContext forTrigger(ExecutableTrigger<W> trigger) {
-      return new OnMergeContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
-    }
-
-    @Override
-    public MergingStateAccessor<?, W> state() {
-      return state;
-    }
-
-    @Override
-    public MergingTriggerInfo<W> trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
deleted file mode 100644
index dcfd035..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
- *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
-  private final ExecutableTrigger<W> rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger<W> rootTrigger, TriggerContextFactory<W> contextFactory) {
-    Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
-      // finished) for each trigger in the tree.
-      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger<W>.OnElementContext triggerContext = contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger<W>.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-
-    // Clear the finished bits.
-    clearFinished(state);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).clear();
-    }
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits anyways.
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}