You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:39 UTC

[15/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
deleted file mode 100644
index d51fc7e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * The argument to the {@link Window} transform used to assign elements into
- * windows and to determine how windows are merged.  See {@link Window} for more
- * information on how {@code WindowFn}s are used and for a library of
- * predefined {@code WindowFn}s.
- *
- * <p>Users will generally want to use the predefined
- * {@code WindowFn}s, but it is also possible to create new
- * subclasses.
- *
- * <p>To create a custom {@code WindowFn}, inherit from this class and override all required
- * methods.  If no merging is required, inherit from {@link NonMergingWindowFn}
- * instead.  If no merging is required and each element is assigned to a single window, inherit from
- * {@code PartitioningWindowFn}.  Inheriting from the most specific subclass will enable more
- * optimizations in the runner.
- *
- * @param <T> type of elements being windowed
- * @param <W> {@link BoundedWindow} subclass used to represent the
- *            windows used by this {@code WindowFn}
- */
-public abstract class WindowFn<T, W extends BoundedWindow>
-    implements Serializable {
-  /**
-   * Information available when running {@link #assignWindows}.
-   */
-  public abstract class AssignContext {
-    /**
-     * Returns the current element.
-     */
-    public abstract T element();
-
-    /**
-     * Returns the timestamp of the current element.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns the windows the current element was in, prior to this
-     * {@code WindowFn} being called.
-     */
-    public abstract Collection<? extends BoundedWindow> windows();
-  }
-
-  /**
-   * Given a timestamp and element, returns the set of windows into which it
-   * should be placed.
-   */
-  public abstract Collection<W> assignWindows(AssignContext c) throws Exception;
-
-  /**
-   * Information available when running {@link #mergeWindows}.
-   */
-  public abstract class MergeContext {
-    /**
-     * Returns the current set of windows.
-     */
-    public abstract Collection<W> windows();
-
-    /**
-     * Signals to the framework that the windows in {@code toBeMerged} should
-     * be merged together to form {@code mergeResult}.
-     *
-     * <p>{@code toBeMerged} should be a subset of {@link #windows}
-     * and disjoint from the {@code toBeMerged} set of previous calls
-     * to {@code merge}.
-     *
-     * <p>{@code mergeResult} must either not be in {@link #windows} or be in
-     * {@code toBeMerged}.
-     *
-     * @throws IllegalArgumentException if any elements of toBeMerged are not
-     * in windows(), or have already been merged
-     */
-    public abstract void merge(Collection<W> toBeMerged, W mergeResult)
-        throws Exception;
-  }
-
-  /**
-   * Does whatever merging of windows is necessary.
-   *
-   * <p>See {@link MergeOverlappingIntervalWindows#mergeWindows} for an
-   * example of how to override this method.
-   */
-  public abstract void mergeWindows(MergeContext c) throws Exception;
-
-  /**
-   * Returns whether this performs the same merging as the given
-   * {@code WindowFn}.
-   */
-  public abstract boolean isCompatible(WindowFn<?, ?> other);
-
-  /**
-   * Returns the {@link Coder} used for serializing the windows used
-   * by this windowFn.
-   */
-  public abstract Coder<W> windowCoder();
-
-  /**
-   * Returns the window of the side input corresponding to the given window of
-   * the main input.
-   *
-   * <p>Authors of custom {@code WindowFn}s should override this.
-   */
-  public abstract W getSideInputWindow(final BoundedWindow window);
-
-  /**
-   * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate
-   * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending
-   * {@link OutputTimeFn.Defaults}.
-   */
-  @Deprecated
-  @Experimental(Kind.OUTPUT_TIME)
-  public Instant getOutputTime(Instant inputTimestamp, W window) {
-    return getOutputTimeFn().assignOutputTime(inputTimestamp, window);
-  }
-
-  /**
-   * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}.
-   * See the full specification there.
-   *
-   * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably
-   * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}.
-   *
-   * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is
-   * suggested that the result in later overlapping windows is past the end of earlier windows so
-   * that the later windows don't prevent the watermark from progressing past the end of the earlier
-   * window.
-   *
-   * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next
-   * sliding window. See {@link SlidingWindows#getOutputTimeFn}.
-   */
-  @Experimental(Kind.OUTPUT_TIME)
-  public OutputTimeFn<? super W> getOutputTimeFn() {
-    return new OutputAtEarliestAssignedTimestamp<>(this);
-  }
-
-  /**
-   * Returns true if this {@code WindowFn} never needs to merge any windows.
-   */
-  public boolean isNonMerging() {
-    return false;
-  }
-
-  /**
-   * Returns true if this {@code WindowFn} assigns each element to a single window.
-   */
-  public boolean assignsToSingleWindow() {
-    return false;
-  }
-
-  /**
-   * A compatibility adapter that will return the assigned timestamps according to the
-   * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps
-   * on the {@link WindowFn} is now deprecated.
-   */
-  private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow>
-      extends OutputTimeFn.Defaults<W> {
-
-    private final WindowFn<?, W> windowFn;
-
-    public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) {
-      this.windowFn = windowFn;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}.
-     */
-    @Override
-    @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior
-    public Instant assignOutputTime(Instant timestamp, W window) {
-      return windowFn.getOutputTime(timestamp, window);
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().min(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn}
-     *         or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends
-     *         only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
deleted file mode 100644
index 65ccf71..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines the {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} transform
- * for dividing the elements in a PCollection into windows, and the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for controlling when those
- * elements are output.
- *
- * <p>{@code Window} logically divides up or groups the elements of a
- * {@link com.google.cloud.dataflow.sdk.values.PCollection} into finite windows according to a
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}.
- * The output of {@code Window} contains the same elements as input, but they
- * have been logically assigned to windows. The next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}s, including one
- * within composite transforms, will group by the combination of keys and
- * windows.
- *
- * <p>Windowing a {@code PCollection} allows chunks of it to be processed
- * individually, before the entire {@code PCollection} is available.  This is
- * especially important for {@code PCollection}s with unbounded size, since the full
- * {@code PCollection} is never available at once.
- *
- * <p>For {@code PCollection}s with a bounded size, by default, all data is implicitly in a
- * single window, and this replicates conventional batch mode. However, windowing can still be a
- * convenient way to express time-sliced algorithms over bounded {@code PCollection}s.
- *
- * <p>As elements are assigned to a window, they are are placed into a pane. When the trigger fires
- * all of the elements in the current pane are output.
- *
- * <p>The {@link com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger} will output a
- * window when the system watermark passes the end of the window.  See
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark} for details on the
- * watermark.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
deleted file mode 100644
index 69350cb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import java.util.Collection;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Track which active windows have their state associated with merged-away windows.
- *
- * When windows are merged we must track which state previously associated with the merged windows
- * must now be associated with the result window. Some of that state may be combined eagerly when
- * the windows are merged. The rest is combined lazily when the final state is actually
- * required when emitting a pane. We keep track of this using an {@link ActiveWindowSet}.
- *
- * <p>An {@link ActiveWindowSet} considers a window to be in one of the following states:
- *
- * <ol>
- *   <li><b>NEW</b>: The initial state for a window on an incoming element; we do not yet know
- *       if it should be merged into an ACTIVE window, or whether it is already present as an
- *       ACTIVE window, since we have not yet called
- *       {@link WindowFn#mergeWindows}.</li>
- *   <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged
- *       away. The window may have one or more <i>state address</i> windows under which its
- *       non-empty state is stored. A state value for an ACTIVE window must be derived by reading
- *       the state in all of its state address windows.</li>
- *   <li><b>EPHEMERAL</b>: A NEW window that has been merged into an ACTIVE window before any state
- *       has been associated with that window. Thus the window is neither ACTIVE nor MERGED. These
- *       windows are not persistently represented since if they reappear the merge function should
- *       again redirect them to an ACTIVE window. EPHEMERAL windows are an optimization for
- *       the common case of in-order events and {@link Sessions session window} by never associating
- *       state with windows that are created and immediately merged away.</li>
- *   <li><b>MERGED</b>: An ACTIVE window has been merged into another ACTIVE window after it had
- *       state associated with it. The window will thus appear as a state address window for exactly
- *       one ACTIVE window.</li>
- *   <li><b>EXPIRED</b>: The window has expired and may have been garbage collected. No new elements
- *       (even late elements) will ever be assigned to that window. These windows are not explicitly
- *       represented anywhere; it is expected that the user of {@link ActiveWindowSet} will store
- *       no state associated with the window.</li>
- * </ol>
- *
- * <p>
- *
- * <p>If no windows will ever be merged we can use the trivial implementation {@link
- * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in
- * {@link MergingActiveWindowSet}.
- *
- * @param <W> the type of window being managed
- */
-public interface ActiveWindowSet<W extends BoundedWindow> {
-  /**
-   * Callback for {@link #merge}.
-   */
-  public interface MergeCallback<W extends BoundedWindow> {
-    /**
-     * Called when windows are about to be merged, but before any {@link #onMerge} callback
-     * has been made.
-     */
-    void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult)
-        throws Exception;
-
-    /**
-     * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls
-     * have been made, but before the active window set has been updated to reflect the merge.
-     *
-     * @param toBeMerged the windows about to be merged.
-     * @param activeToBeMerged the subset of {@code toBeMerged} corresponding to windows which
-     * are currently ACTIVE (and about to be merged). The remaining windows have been deemed
-     * EPHEMERAL, and thus have no state associated with them.
-     * @param mergeResult the result window, either a member of {@code toBeMerged} or new.
-     */
-    void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult)
-        throws Exception;
-  }
-
-  /**
-   * Remove EPHEMERAL windows since we only need to know about them while processing new elements.
-   */
-  void removeEphemeralWindows();
-
-  /**
-   * Save any state changes needed.
-   */
-  void persist();
-
-  /**
-   * Return the ACTIVE window into which {@code window} has been merged.
-   * Return {@code window} itself if it is ACTIVE. Return null if {@code window} has not
-   * yet been seen.
-   */
-  @Nullable
-  W representative(W window);
-
-  /**
-   * Return (a view of) the set of currently ACTIVE windows.
-   */
-  Set<W> getActiveWindows();
-
-  /**
-   * Return {@code true} if {@code window} is ACTIVE.
-   */
-  boolean isActive(W window);
-
-  /**
-   * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
-   * as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call
-   * to {@link #merge}.
-   */
-  void addNew(W window);
-
-  /**
-   * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
-   * as ACTIVE.
-   */
-  void addActive(W window);
-
-  /**
-   * Remove {@code window} from the set.
-   */
-  void remove(W window);
-
-  /**
-   * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set,
-   * merging as many of the active windows as possible. {@code mergeCallback} will be invoked for
-   * each group of windows that are merged. After this no NEW windows will remain, all merge
-   * result windows will be ACTIVE, and all windows which have been merged away will not be ACTIVE.
-   */
-  void merge(MergeCallback<W> mergeCallback) throws Exception;
-
-  /**
-   * Signal that all state in {@link #readStateAddresses} for {@code window} has been merged into
-   * the {@link #writeStateAddress} for {@code window}.
-   */
-  void merged(W window);
-
-  /**
-   * Return the state address windows for ACTIVE {@code window} from which all state associated
-   * should be read and merged.
-   */
-  Set<W> readStateAddresses(W window);
-
-  /**
-   * Return the state address window of ACTIVE {@code window} into which all new state should be
-   * written. Always one of the results of {@link #readStateAddresses}.
-   */
-  W writeStateAddress(W window);
-
-  /**
-   * Return the state address window into which all new state should be written after
-   * ACTIVE windows {@code toBeMerged} have been merged into {@code mergeResult}.
-   */
-  W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
deleted file mode 100644
index 7a9c877..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.ClassPath;
-import com.google.common.reflect.ClassPath.ClassInfo;
-import com.google.common.reflect.Invokable;
-import com.google.common.reflect.Parameter;
-import com.google.common.reflect.TypeToken;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.WildcardType;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * Represents the API surface of a package prefix. Used for accessing public classes,
- * methods, and the types they reference, to control what dependencies are re-exported.
- *
- * <p>For the purposes of calculating the public API surface, exposure includes any public
- * or protected occurrence of:
- *
- * <ul>
- * <li>superclasses
- * <li>interfaces implemented
- * <li>actual type arguments to generic types
- * <li>array component types
- * <li>method return types
- * <li>method parameter types
- * <li>type variable bounds
- * <li>wildcard bounds
- * </ul>
- *
- * <p>Exposure is a transitive property. The resulting map excludes primitives
- * and array classes themselves.
- *
- * <p>It is prudent (though not required) to prune prefixes like "java" via the builder
- * method {@link #pruningPrefix} to halt the traversal so it does not uselessly catalog references
- * that are not interesting.
- */
-@SuppressWarnings("rawtypes")
-public class ApiSurface {
-  private static Logger logger = LoggerFactory.getLogger(ApiSurface.class);
-
-  /**
-   * Returns an empty {@link ApiSurface}.
-   */
-  public static ApiSurface empty() {
-    logger.debug("Returning an empty ApiSurface");
-    return new ApiSurface(Collections.<Class<?>>emptySet(), Collections.<Pattern>emptySet());
-  }
-
-  /**
-   * Returns an {@link ApiSurface} object representing the given package and all subpackages.
-   */
-  public static ApiSurface ofPackage(String packageName) throws IOException {
-    return ApiSurface.empty().includingPackage(packageName);
-  }
-
-  /**
-   * Returns an {@link ApiSurface} object representing just the surface of the given class.
-   */
-  public static ApiSurface ofClass(Class<?> clazz) {
-    return ApiSurface.empty().includingClass(clazz);
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but also including the named
-   * package and all of its subpackages.
-   */
-  public ApiSurface includingPackage(String packageName) throws IOException {
-    ClassPath classPath = ClassPath.from(ClassLoader.getSystemClassLoader());
-
-    Set<Class<?>> newRootClasses = Sets.newHashSet();
-    for (ClassInfo classInfo : classPath.getTopLevelClassesRecursive(packageName)) {
-      Class clazz = classInfo.load();
-      if (exposed(clazz.getModifiers())) {
-        newRootClasses.add(clazz);
-      }
-    }
-    logger.debug("Including package {} and subpackages: {}", packageName, newRootClasses);
-    newRootClasses.addAll(rootClasses);
-
-    return new ApiSurface(newRootClasses, patternsToPrune);
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but also including the given class.
-   */
-  public ApiSurface includingClass(Class<?> clazz) {
-    Set<Class<?>> newRootClasses = Sets.newHashSet();
-    logger.debug("Including class {}", clazz);
-    newRootClasses.add(clazz);
-    newRootClasses.addAll(rootClasses);
-    return new ApiSurface(newRootClasses, patternsToPrune);
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but pruning transitive
-   * references from classes whose full name (including package) begins with the provided prefix.
-   */
-  public ApiSurface pruningPrefix(String prefix) {
-    return pruningPattern(Pattern.compile(Pattern.quote(prefix) + ".*"));
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but pruning references from the named
-   * class.
-   */
-  public ApiSurface pruningClassName(String className) {
-    return pruningPattern(Pattern.compile(Pattern.quote(className)));
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but pruning references from the
-   * provided class.
-   */
-  public ApiSurface pruningClass(Class<?> clazz) {
-    return pruningClassName(clazz.getName());
-  }
-
-  /**
-   * Returns an {@link ApiSurface} like this one, but pruning transitive
-   * references from classes whose full name (including package) begins with the provided prefix.
-   */
-  public ApiSurface pruningPattern(Pattern pattern) {
-    Set<Pattern> newPatterns = Sets.newHashSet();
-    newPatterns.addAll(patternsToPrune);
-    newPatterns.add(pattern);
-    return new ApiSurface(rootClasses, newPatterns);
-  }
-
-  /**
-   * See {@link #pruningPattern(Pattern)}.
-   */
-  public ApiSurface pruningPattern(String patternString) {
-    return pruningPattern(Pattern.compile(patternString));
-  }
-
-  /**
-   * Returns all public classes originally belonging to the package
-   * in the {@link ApiSurface}.
-   */
-  public Set<Class<?>> getRootClasses() {
-    return rootClasses;
-  }
-
-  /**
-   * Returns exposed types in this set, including arrays and primitives as
-   * specified.
-   */
-  public Set<Class<?>> getExposedClasses() {
-    return getExposedToExposers().keySet();
-  }
-
-  /**
-   * Returns a path from an exposed class to a root class. There may be many, but this
-   * gives only one.
-   *
-   * <p>If there are only cycles, with no path back to a root class, throws
-   * IllegalStateException.
-   */
-  public List<Class<?>> getAnyExposurePath(Class<?> exposedClass) {
-    Set<Class<?>> excluded = Sets.newHashSet();
-    excluded.add(exposedClass);
-    List<Class<?>> path = getAnyExposurePath(exposedClass, excluded);
-    if (path == null) {
-      throw new IllegalArgumentException(
-          "Class " + exposedClass + " has no path back to any root class."
-          + " It should never have been considered exposed.");
-    } else {
-      return path;
-    }
-  }
-
-  /**
-   * Returns a path from an exposed class to a root class. There may be many, but this
-   * gives only one. It will not return a path that crosses the excluded classes.
-   *
-   * <p>If there are only cycles or paths through the excluded classes, returns null.
-   *
-   * <p>If the class is not actually in the exposure map, throws IllegalArgumentException
-   */
-  private List<Class<?>> getAnyExposurePath(Class<?> exposedClass, Set<Class<?>> excluded) {
-    List<Class<?>> exposurePath = Lists.newArrayList();
-    exposurePath.add(exposedClass);
-
-    Collection<Class<?>> exposers = getExposedToExposers().get(exposedClass);
-    if (exposers.isEmpty()) {
-      throw new IllegalArgumentException("Class " + exposedClass + " is not exposed.");
-    }
-
-    for (Class<?> exposer : exposers) {
-      if (excluded.contains(exposer)) {
-        continue;
-      }
-
-      // A null exposer means this is already a root class.
-      if (exposer == null) {
-        return exposurePath;
-      }
-
-      List<Class<?>> restOfPath = getAnyExposurePath(
-          exposer,
-          Sets.union(excluded, Sets.newHashSet(exposer)));
-
-      if (restOfPath != null) {
-        exposurePath.addAll(restOfPath);
-        return exposurePath;
-      }
-    }
-    return null;
-  }
-
-  ////////////////////////////////////////////////////////////////////
-
-  // Fields initialized upon construction
-  private final Set<Class<?>> rootClasses;
-  private final Set<Pattern> patternsToPrune;
-
-  // Fields computed on-demand
-  private Multimap<Class<?>, Class<?>> exposedToExposers = null;
-  private Pattern prunedPattern = null;
-  private Set<Type> visited = null;
-
-  private ApiSurface(Set<Class<?>> rootClasses, Set<Pattern> patternsToPrune) {
-    this.rootClasses = rootClasses;
-    this.patternsToPrune = patternsToPrune;
-  }
-
-  /**
-   * A map from exposed types to place where they are exposed, in the sense of being a part
-   * of a public-facing API surface.
-   *
-   * <p>This map is the adjencency list representation of a directed graph, where an edge from type
-   * {@code T1} to type {@code T2} indicates that {@code T2} directly exposes {@code T1} in its API
-   * surface.
-   *
-   * <p>The traversal methods in this class are designed to avoid repeatedly processing types, since
-   * there will almost always be cyclic references.
-   */
-  private Multimap<Class<?>, Class<?>> getExposedToExposers() {
-    if (exposedToExposers == null) {
-      constructExposedToExposers();
-    }
-    return exposedToExposers;
-  }
-
-  /**
-   * See {@link #getExposedToExposers}.
-   */
-  private void constructExposedToExposers() {
-    visited = Sets.newHashSet();
-    exposedToExposers = Multimaps.newSetMultimap(
-        Maps.<Class<?>, Collection<Class<?>>>newHashMap(),
-        new Supplier<Set<Class<?>>>() {
-          @Override
-          public Set<Class<?>> get() {
-            return Sets.newHashSet();
-          }
-        });
-
-    for (Class<?> clazz : rootClasses) {
-      addExposedTypes(clazz, null);
-    }
-  }
-
-  /**
-   * A combined {@code Pattern} that implements all the pruning specified.
-   */
-  private Pattern getPrunedPattern() {
-    if (prunedPattern == null) {
-      constructPrunedPattern();
-    }
-    return prunedPattern;
-  }
-
-  /**
-   * See {@link #getPrunedPattern}.
-   */
-  private void constructPrunedPattern() {
-    Set<String> prunedPatternStrings = Sets.newHashSet();
-    for (Pattern patternToPrune : patternsToPrune) {
-      prunedPatternStrings.add(patternToPrune.pattern());
-    }
-    prunedPattern = Pattern.compile("(" + Joiner.on(")|(").join(prunedPatternStrings) + ")");
-  }
-
-  /**
-   * Whether a type and all that it references should be pruned from the graph.
-   */
-  private boolean pruned(Type type) {
-    return pruned(TypeToken.of(type).getRawType());
-  }
-
-  /**
-   * Whether a class and all that it references should be pruned from the graph.
-   */
-  private boolean pruned(Class<?> clazz) {
-    return clazz.isPrimitive()
-        || clazz.isArray()
-        || getPrunedPattern().matcher(clazz.getName()).matches();
-  }
-
-  /**
-   * Whether a type has already beens sufficiently processed.
-   */
-  private boolean done(Type type) {
-    return visited.contains(type);
-  }
-
-  private void recordExposure(Class<?> exposed, Class<?> cause) {
-    exposedToExposers.put(exposed, cause);
-  }
-
-  private void recordExposure(Type exposed, Class<?> cause) {
-    exposedToExposers.put(TypeToken.of(exposed).getRawType(), cause);
-  }
-
-  private void visit(Type type) {
-    visited.add(type);
-  }
-
-  /**
-   * See {@link #addExposedTypes(Type, Class)}.
-   */
-  private void addExposedTypes(TypeToken type, Class<?> cause) {
-    logger.debug(
-        "Adding exposed types from {}, which is the type in type token {}", type.getType(), type);
-    addExposedTypes(type.getType(), cause);
-  }
-
-  /**
-   * Adds any references learned by following a link from {@code cause} to {@code type}.
-   * This will dispatch according to the concrete {@code Type} implementation. See the
-   * other overloads of {@code addExposedTypes} for their details.
-   */
-  private void addExposedTypes(Type type, Class<?> cause) {
-    if (type instanceof TypeVariable) {
-      logger.debug("Adding exposed types from {}, which is a type variable", type);
-      addExposedTypes((TypeVariable) type, cause);
-    } else if (type instanceof WildcardType) {
-      logger.debug("Adding exposed types from {}, which is a wildcard type", type);
-      addExposedTypes((WildcardType) type, cause);
-    } else if (type instanceof GenericArrayType) {
-      logger.debug("Adding exposed types from {}, which is a generic array type", type);
-      addExposedTypes((GenericArrayType) type, cause);
-    } else if (type instanceof ParameterizedType) {
-      logger.debug("Adding exposed types from {}, which is a parameterized type", type);
-      addExposedTypes((ParameterizedType) type, cause);
-    } else if (type instanceof Class) {
-      logger.debug("Adding exposed types from {}, which is a class", type);
-      addExposedTypes((Class) type, cause);
-    } else {
-      throw new IllegalArgumentException("Unknown implementation of Type");
-    }
-  }
-
-  /**
-   * Adds any types exposed to this set. These will
-   * come from the (possibly absent) bounds on the
-   * type variable.
-   */
-  private void addExposedTypes(TypeVariable type, Class<?> cause) {
-    if (done(type)) {
-      return;
-    }
-    visit(type);
-    for (Type bound : type.getBounds()) {
-      logger.debug("Adding exposed types from {}, which is a type bound on {}", bound, type);
-      addExposedTypes(bound, cause);
-    }
-  }
-
-  /**
-   * Adds any types exposed to this set. These will come from the (possibly absent) bounds on the
-   * wildcard.
-   */
-  private void addExposedTypes(WildcardType type, Class<?> cause) {
-    visit(type);
-    for (Type lowerBound : type.getLowerBounds()) {
-      logger.debug(
-          "Adding exposed types from {}, which is a type lower bound on wildcard type {}",
-          lowerBound,
-          type);
-      addExposedTypes(lowerBound, cause);
-    }
-    for (Type upperBound : type.getUpperBounds()) {
-      logger.debug(
-          "Adding exposed types from {}, which is a type upper bound on wildcard type {}",
-          upperBound,
-          type);
-      addExposedTypes(upperBound, cause);
-    }
-  }
-
-  /**
-   * Adds any types exposed from the given array type. The array type itself is not added. The
-   * cause of the exposure of the underlying type is considered whatever type exposed the array
-   * type.
-   */
-  private void addExposedTypes(GenericArrayType type, Class<?> cause) {
-    if (done(type)) {
-      return;
-    }
-    visit(type);
-    logger.debug(
-        "Adding exposed types from {}, which is the component type on generic array type {}",
-        type.getGenericComponentType(),
-        type);
-    addExposedTypes(type.getGenericComponentType(), cause);
-  }
-
-  /**
-   * Adds any types exposed to this set. Even if the
-   * root type is to be pruned, the actual type arguments
-   * are processed.
-   */
-  private void addExposedTypes(ParameterizedType type, Class<?> cause) {
-    // Even if the type is already done, this link to it may be new
-    boolean alreadyDone = done(type);
-    if (!pruned(type)) {
-      visit(type);
-      recordExposure(type, cause);
-    }
-    if (alreadyDone) {
-      return;
-    }
-
-    // For a parameterized type, pruning does not take place
-    // here, only for the raw class.
-    // The type parameters themselves may not be pruned,
-    // for example with List<MyApiType> probably the
-    // standard List is pruned, but MyApiType is not.
-    logger.debug(
-        "Adding exposed types from {}, which is the raw type on parameterized type {}",
-        type.getRawType(),
-        type);
-    addExposedTypes(type.getRawType(), cause);
-    for (Type typeArg : type.getActualTypeArguments()) {
-      logger.debug(
-          "Adding exposed types from {}, which is a type argument on parameterized type {}",
-          typeArg,
-          type);
-      addExposedTypes(typeArg, cause);
-    }
-  }
-
-  /**
-   * Adds a class and all of the types it exposes. The cause
-   * of the class being exposed is given, and the cause
-   * of everything within the class is that class itself.
-   */
-  private void addExposedTypes(Class<?> clazz, Class<?> cause) {
-    if (pruned(clazz)) {
-      return;
-    }
-    // Even if `clazz` has been visited, the link from `cause` may be new
-    boolean alreadyDone = done(clazz);
-    visit(clazz);
-    recordExposure(clazz, cause);
-    if (alreadyDone || pruned(clazz)) {
-      return;
-    }
-
-    TypeToken<?> token = TypeToken.of(clazz);
-    for (TypeToken<?> superType : token.getTypes()) {
-      if (!superType.equals(token)) {
-        logger.debug(
-            "Adding exposed types from {}, which is a super type token on {}", superType, clazz);
-        addExposedTypes(superType, clazz);
-      }
-    }
-    for (Class innerClass : clazz.getDeclaredClasses()) {
-      if (exposed(innerClass.getModifiers())) {
-        logger.debug(
-            "Adding exposed types from {}, which is an exposed inner class of {}",
-            innerClass,
-            clazz);
-        addExposedTypes(innerClass, clazz);
-      }
-    }
-    for (Field field : clazz.getDeclaredFields()) {
-      if (exposed(field.getModifiers())) {
-        logger.debug("Adding exposed types from {}, which is an exposed field on {}", field, clazz);
-        addExposedTypes(field, clazz);
-      }
-    }
-    for (Invokable invokable : getExposedInvokables(token)) {
-      logger.debug(
-          "Adding exposed types from {}, which is an exposed invokable on {}", invokable, clazz);
-      addExposedTypes(invokable, clazz);
-    }
-  }
-
-  private void addExposedTypes(Invokable<?, ?> invokable, Class<?> cause) {
-    addExposedTypes(invokable.getReturnType(), cause);
-    for (Annotation annotation : invokable.getAnnotations()) {
-      logger.debug(
-          "Adding exposed types from {}, which is an annotation on invokable {}",
-          annotation,
-          invokable);
-     addExposedTypes(annotation.annotationType(), cause);
-    }
-    for (Parameter parameter : invokable.getParameters()) {
-      logger.debug(
-          "Adding exposed types from {}, which is a parameter on invokable {}",
-          parameter,
-          invokable);
-      addExposedTypes(parameter, cause);
-    }
-    for (TypeToken<?> exceptionType : invokable.getExceptionTypes()) {
-      logger.debug(
-          "Adding exposed types from {}, which is an exception type on invokable {}",
-          exceptionType,
-          invokable);
-      addExposedTypes(exceptionType, cause);
-    }
-  }
-
-  private void addExposedTypes(Parameter parameter, Class<?> cause) {
-    logger.debug(
-        "Adding exposed types from {}, which is the type of parameter {}",
-        parameter.getType(),
-        parameter);
-    addExposedTypes(parameter.getType(), cause);
-    for (Annotation annotation : parameter.getAnnotations()) {
-      logger.debug(
-          "Adding exposed types from {}, which is an annotation on parameter {}",
-          annotation,
-          parameter);
-      addExposedTypes(annotation.annotationType(), cause);
-    }
-  }
-
-  private void addExposedTypes(Field field, Class<?> cause) {
-    addExposedTypes(field.getGenericType(), cause);
-    for (Annotation annotation : field.getDeclaredAnnotations()) {
-      logger.debug(
-          "Adding exposed types from {}, which is an annotation on field {}", annotation, field);
-      addExposedTypes(annotation.annotationType(), cause);
-    }
-  }
-
-  /**
-   * Returns an {@link Invokable} for each public methods or constructors of a type.
-   */
-  private Set<Invokable> getExposedInvokables(TypeToken<?> type) {
-    Set<Invokable> invokables = Sets.newHashSet();
-
-    for (Constructor constructor : type.getRawType().getConstructors()) {
-      if (0 != (constructor.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) {
-        invokables.add(type.constructor(constructor));
-      }
-    }
-
-    for (Method method : type.getRawType().getMethods()) {
-      if (0 != (method.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) {
-        invokables.add(type.method(method));
-      }
-    }
-
-    return invokables;
-  }
-
-  /**
-   * Returns true of the given modifier bitmap indicates exposure (public or protected access).
-   */
-  private boolean exposed(int modifiers) {
-    return 0 != (modifiers & (Modifier.PUBLIC | Modifier.PROTECTED));
-  }
-
-
-  ////////////////////////////////////////////////////////////////////////////
-
-  public static ApiSurface getSdkApiSurface() throws IOException {
-    return ApiSurface.ofPackage("com.google.cloud.dataflow")
-        .pruningPattern("com[.]google[.]cloud[.]dataflow.*Test")
-        .pruningPattern("com[.]google[.]cloud[.]dataflow.*Benchmark")
-        .pruningPrefix("com.google.cloud.dataflow.integration")
-        .pruningPrefix("java")
-        .pruningPrefix("com.google.api")
-        .pruningPrefix("com.google.auth")
-        .pruningPrefix("com.google.bigtable.v1")
-        .pruningPrefix("com.google.cloud.bigtable.config")
-        .pruningPrefix("com.google.cloud.bigtable.grpc.Bigtable*Name")
-        .pruningPrefix("com.google.protobuf")
-        .pruningPrefix("org.joda.time")
-        .pruningPrefix("org.apache.avro")
-        .pruningPrefix("org.junit")
-        .pruningPrefix("com.fasterxml.jackson.annotation");
-  }
-
-  public static void main(String[] args) throws Exception {
-    List<String> names = Lists.newArrayList();
-    for (Class clazz : getSdkApiSurface().getExposedClasses()) {
-      names.add(clazz.getName());
-    }
-    List<String> sortedNames = Lists.newArrayList(names);
-    Collections.sort(sortedNames);
-
-    for (String name : sortedNames) {
-      System.out.println(name);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
deleted file mode 100644
index c7fe4b4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.lang.reflect.InvocationTargetException;
-
-/** Stores whether we are running within AppEngine or not. */
-public class AppEngineEnvironment {
-  /**
-   * True if running inside of AppEngine, false otherwise.
-   */
-  @Deprecated
-  public static final boolean IS_APP_ENGINE = isAppEngine();
-
-  /**
-   * Attempts to detect whether we are inside of AppEngine.
-   *
-   * <p>Purposely copied and left private from private <a href="https://code.google.com/p/
-   * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/
-   * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>.
-   *
-   * @return true if we are inside of AppEngine, false otherwise.
-   */
-  static boolean isAppEngine() {
-    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
-      return false;
-    }
-    try {
-      // If the current environment is null, we're not inside AppEngine.
-      return Class.forName("com.google.apphosting.api.ApiProxy")
-          .getMethod("getCurrentEnvironment")
-          .invoke(null) != null;
-    } catch (ClassNotFoundException e) {
-      // If ApiProxy doesn't exist, we're not on AppEngine at all.
-      return false;
-    } catch (InvocationTargetException e) {
-      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
-      return false;
-    } catch (IllegalAccessException e) {
-      // If the method isn't accessible, we're not on a supported version of AppEngine;
-      return false;
-    } catch (NoSuchMethodException e) {
-      // If the method doesn't exist, we're not on a supported version of AppEngine;
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
deleted file mode 100644
index 512d72d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.Serializable;
-
-/**
- * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a
- * specific application of the {@link KeyedCombineFnWithContext}.
- *
- *  <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>}
- *  may depend on the {@code Coder<InputT>}.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
-public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable {
-
-  private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn;
-  private final Coder<AccumT> accumulatorCoder;
-
-  private final Iterable<PCollectionView<?>> sideInputViews;
-  private final KvCoder<K, InputT> kvCoder;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-
-  private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
-      Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
-      KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = fn;
-    this.accumulatorCoder = accumulatorCoder;
-    this.sideInputViews = sideInputViews;
-    this.kvCoder = kvCoder;
-    this.windowingStrategy = windowingStrategy;
-  }
-
-  public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withAccumulatorCoder(
-          PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
-          Coder<AccumT> accumCoder) {
-    return withAccumulatorCoder(fn, accumCoder, null, null, null);
-  }
-
-  public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withAccumulatorCoder(
-          PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
-          Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews,
-          KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
-    // Casting down the K and InputT is safe because they're only used as inputs.
-    @SuppressWarnings("unchecked")
-    PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
-        (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
-    return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy);
-  }
-
-  @VisibleForTesting
-  public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
-          CoderRegistry registry, KvCoder<K, InputT> kvCoder) {
-    return withInputCoder(fn, registry, kvCoder, null, null);
-  }
-
-  public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
-          CoderRegistry registry, KvCoder<K, InputT> kvCoder,
-          Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) {
-    // Casting down the K and InputT is safe because they're only used as inputs.
-    @SuppressWarnings("unchecked")
-    PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
-        (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
-    try {
-      Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder(
-          registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder());
-      return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException("Could not determine coder for accumulator", e);
-    }
-  }
-
-  private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create(
-      PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
-      Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
-      KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
-    return new AppliedCombineFn<>(
-        fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
-  }
-
-  public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() {
-    return fn;
-  }
-
-  public Iterable<PCollectionView<?>> getSideInputViews() {
-    return sideInputViews;
-  }
-
-  public Coder<AccumT> getAccumulatorCoder() {
-    return accumulatorCoder;
-  }
-
-  public KvCoder<K, InputT> getKvCoder() {
-    return kvCoder;
-  }
-
-  public WindowingStrategy<?, ?> getWindowingStrategy() {
-    return windowingStrategy;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
deleted file mode 100644
index ca59c53..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-/**
- * {@link DoFn} that tags elements of a PCollection with windows, according
- * to the provided {@link WindowFn}.
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
-    this.fn = fn;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void processElement(final ProcessContext c) throws Exception {
-    Collection<W> windows =
-        ((WindowFn<T, W>) fn).assignWindows(
-            ((WindowFn<T, W>) fn).new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
-                }
-              });
-
-    c.windowingInternals()
-        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
deleted file mode 100644
index e94d414..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.NanoClock;
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
- * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
- * unless the time interval has expired since the object was created. At this point, it will always
- * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
- * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
- * constructor.
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
-  private long endTimeMillis;
-  private long maximumTotalWaitTimeMillis;
-  private ResetPolicy resetPolicy;
-  private final NanoClock nanoClock;
-  // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns.  Here, we choose 2^53 ns as
-  // a smaller but still huge limit.
-  private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
-
-  /**
-   * A ResetPolicy controls the behavior of this BackOff when reset() is called.  By default, both
-   * the number of attempts and the time bound for the BackOff are reset, but an alternative
-   * ResetPolicy may be set to only reset one of these two.
-   */
-  public static enum ResetPolicy {
-    ALL,
-    ATTEMPTS,
-    TIMER
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
-    this(
-        maximumNumberOfAttempts,
-        initialIntervalMillis,
-        maximumTotalWaitTimeMillis,
-        ResetPolicy.ALL,
-        NanoClock.SYSTEM);
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
-   *    to being reset.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts,
-      long initialIntervalMillis,
-      long maximumTotalWaitTimeMillis,
-      ResetPolicy resetPolicy) {
-    this(
-        maximumNumberOfAttempts,
-        initialIntervalMillis,
-        maximumTotalWaitTimeMillis,
-        resetPolicy,
-        NanoClock.SYSTEM);
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
-   *    to being reset.
-   * @param nanoClock clock used to measure the time that has passed.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts,
-      long initialIntervalMillis,
-      long maximumTotalWaitTimeMillis,
-      ResetPolicy resetPolicy,
-      NanoClock nanoClock) {
-    super(maximumNumberOfAttempts, initialIntervalMillis);
-    Preconditions.checkArgument(
-        maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
-    Preconditions.checkArgument(
-        maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
-        "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
-    Preconditions.checkArgument(resetPolicy != null, "resetPolicy may not be null");
-    Preconditions.checkArgument(nanoClock != null, "nanoClock may not be null");
-    this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
-    this.resetPolicy = resetPolicy;
-    this.nanoClock = nanoClock;
-    // Set the end time for this BackOff.  Note that we cannot simply call reset() here since the
-    // resetPolicy may not be set to reset the time bound.
-    endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
-  }
-
-  @Override
-  public void reset() {
-    // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
-    // set.  In this case, we call the parent class's reset() method and return.
-    if (resetPolicy == null) {
-      super.reset();
-      return;
-    }
-    // Reset the number of attempts.
-    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
-      super.reset();
-    }
-    // Reset the time bound.
-    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
-      endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
-    }
-  }
-
-  public void setEndtimeMillis(long endTimeMillis) {
-    this.endTimeMillis = endTimeMillis;
-  }
-
-  @Override
-  public long nextBackOffMillis() {
-    if (atMaxAttempts()) {
-      return BackOff.STOP;
-    }
-    long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
-    return (backoff > 0 ? backoff : BackOff.STOP);
-  }
-
-  private long getTimeMillis() {
-    return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
-  }
-
-  @Override
-  public boolean atMaxAttempts() {
-    return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
deleted file mode 100644
index 613316e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.base.Preconditions;
-
-/**
- * Implementation of {@link BackOff} that increases the back off period for each retry attempt
- * using a randomization function that grows exponentially.
- *
- * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
- * For 10 tries the sequence will be (values in seconds):
- *
- * <pre>
- * retry#      retry_interval     randomized_interval
- * 1             0.5                [0.25,   0.75]
- * 2             0.75               [0.375,  1.125]
- * 3             1.125              [0.562,  1.687]
- * 4             1.687              [0.8435, 2.53]
- * 5             2.53               [1.265,  3.795]
- * 6             3.795              [1.897,  5.692]
- * 7             5.692              [2.846,  8.538]
- * 8             8.538              [4.269, 12.807]
- * 9            12.807              [6.403, 19.210]
- * 10           {@link BackOff#STOP}
- * </pre>
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptBoundedExponentialBackOff implements BackOff {
-  public static final double DEFAULT_MULTIPLIER = 1.5;
-  public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
-  private final int maximumNumberOfAttempts;
-  private final long initialIntervalMillis;
-  private int currentAttempt;
-
-  public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
-    Preconditions.checkArgument(maximumNumberOfAttempts > 0,
-        "Maximum number of attempts must be greater than zero.");
-    Preconditions.checkArgument(initialIntervalMillis > 0,
-        "Initial interval must be greater than zero.");
-    this.maximumNumberOfAttempts = maximumNumberOfAttempts;
-    this.initialIntervalMillis = initialIntervalMillis;
-    reset();
-  }
-
-  @Override
-  public void reset() {
-    currentAttempt = 1;
-  }
-
-  @Override
-  public long nextBackOffMillis() {
-    if (currentAttempt >= maximumNumberOfAttempts) {
-      return BackOff.STOP;
-    }
-    double currentIntervalMillis = initialIntervalMillis
-        * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
-    double randomOffset = (Math.random() * 2 - 1)
-        * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
-    currentAttempt += 1;
-    return Math.round(currentIntervalMillis + randomOffset);
-  }
-
-  public boolean atMaxAttempts() {
-    return currentAttempt >= maximumNumberOfAttempts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
deleted file mode 100644
index c3a4861..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DecoderFactory;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A set of utilities for working with Avro files.
- *
- * <p>These utilities are based on the <a
- * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification.
- */
-public class AvroUtils {
-
-  /**
-   * Avro file metadata.
-   */
-  public static class AvroMetadata {
-    private byte[] syncMarker;
-    private String codec;
-    private String schemaString;
-
-    AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
-      this.syncMarker = syncMarker;
-      this.codec = codec;
-      this.schemaString = schemaString;
-    }
-
-    /**
-     * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
-     * string for the file.
-     */
-    public String getSchemaString() {
-      return schemaString;
-    }
-
-    /**
-     * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
-     * file.
-     */
-    public String getCodec() {
-      return codec;
-    }
-
-    /**
-     * The 16-byte sync marker for the file.  See the documentation for
-     * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object
-     * Container File</a> for more information.
-     */
-    public byte[] getSyncMarker() {
-      return syncMarker;
-    }
-  }
-
-  /**
-   * Reads the {@link AvroMetadata} from the header of an Avro file.
-   *
-   * <p>This method parses the header of an Avro
-   * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">
-   * Object Container File</a>.
-   *
-   * @throws IOException if the file is an invalid format.
-   */
-  public static AvroMetadata readMetadataFromFile(String fileName) throws IOException {
-    String codec = null;
-    String schemaString = null;
-    byte[] syncMarker;
-    try (InputStream stream =
-        Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName))) {
-      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
-
-      // The header of an object container file begins with a four-byte magic number, followed
-      // by the file metadata (including the schema and codec), encoded as a map. Finally, the
-      // header ends with the file's 16-byte sync marker.
-      // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
-      // the encoding of container files.
-
-      // Read the magic number.
-      byte[] magic = new byte[DataFileConstants.MAGIC.length];
-      decoder.readFixed(magic);
-      if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
-        throw new IOException("Missing Avro file signature: " + fileName);
-      }
-
-      // Read the metadata to find the codec and schema.
-      ByteBuffer valueBuffer = ByteBuffer.allocate(512);
-      long numRecords = decoder.readMapStart();
-      while (numRecords > 0) {
-        for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
-          String key = decoder.readString();
-          // readBytes() clears the buffer and returns a buffer where:
-          // - position is the start of the bytes read
-          // - limit is the end of the bytes read
-          valueBuffer = decoder.readBytes(valueBuffer);
-          byte[] bytes = new byte[valueBuffer.remaining()];
-          valueBuffer.get(bytes);
-          if (key.equals(DataFileConstants.CODEC)) {
-            codec = new String(bytes, "UTF-8");
-          } else if (key.equals(DataFileConstants.SCHEMA)) {
-            schemaString = new String(bytes, "UTF-8");
-          }
-        }
-        numRecords = decoder.mapNext();
-      }
-      if (codec == null) {
-        codec = DataFileConstants.NULL_CODEC;
-      }
-
-      // Finally, read the sync marker.
-      syncMarker = new byte[DataFileConstants.SYNC_SIZE];
-      decoder.readFixed(syncMarker);
-    }
-    return new AvroMetadata(syncMarker, codec, schemaString);
-  }
-
-  /**
-   * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
-   * immutable.
-   */
-  private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
-  // Package private for BigQueryTableRowIterator to use.
-  static String formatTimestamp(String timestamp) {
-    // timestamp is in "seconds since epoch" format, with scientific notation.
-    // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
-    // Separate into seconds and microseconds.
-    double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
-    long timestampMicros = (long) timestampDoubleMicros;
-    long seconds = timestampMicros / 1000000;
-    int micros = (int) (timestampMicros % 1000000);
-    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
-
-    // No sub-second component.
-    if (micros == 0) {
-      return String.format("%s UTC", dayAndTime);
-    }
-
-    // Sub-second component.
-    int digits = 6;
-    int subsecond = micros;
-    while (subsecond % 10 == 0) {
-      digits--;
-      subsecond /= 10;
-    }
-    String formatString = String.format("%%0%dd", digits);
-    String fractionalSeconds = String.format(formatString, subsecond);
-    return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
-  }
-
-  /**
-   * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
-   *
-   * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
-   * "Avro format"</a> for more information.
-   */
-  public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
-    return convertGenericRecordToTableRow(record, schema.getFields());
-  }
-
-  private static TableRow convertGenericRecordToTableRow(
-      GenericRecord record, List<TableFieldSchema> fields) {
-    TableRow row = new TableRow();
-    for (TableFieldSchema subSchema : fields) {
-      // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
-      // is required, so it may not be null.
-      Field field = record.getSchema().getField(subSchema.getName());
-      Object convertedValue =
-          getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
-      if (convertedValue != null) {
-        // To match the JSON files exported by BigQuery, do not include null values in the output.
-        row.set(field.name(), convertedValue);
-      }
-    }
-    return row;
-  }
-
-  @Nullable
-  private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
-    // is optional (and so it may be null), but defaults to "NULLABLE".
-    String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
-    switch (mode) {
-      case "REQUIRED":
-        return convertRequiredField(schema.getType(), fieldSchema, v);
-      case "REPEATED":
-        return convertRepeatedField(schema, fieldSchema, v);
-      case "NULLABLE":
-        return convertNullableField(schema, fieldSchema, v);
-      default:
-        throw new UnsupportedOperationException(
-            "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
-    }
-  }
-
-  private static List<Object> convertRepeatedField(
-      Schema schema, TableFieldSchema fieldSchema, Object v) {
-    Type arrayType = schema.getType();
-    verify(
-        arrayType == Type.ARRAY,
-        "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
-        fieldSchema.getName(),
-        arrayType);
-    // REPEATED fields are represented as Avro arrays.
-    if (v == null) {
-      // Handle the case of an empty repeated field.
-      return ImmutableList.of();
-    }
-    @SuppressWarnings("unchecked")
-    List<Object> elements = (List<Object>) v;
-    ImmutableList.Builder<Object> values = ImmutableList.builder();
-    Type elementType = schema.getElementType().getType();
-    for (Object element : elements) {
-      values.add(convertRequiredField(elementType, fieldSchema, element));
-    }
-    return values.build();
-  }
-
-  private static Object convertRequiredField(
-      Type avroType, TableFieldSchema fieldSchema, Object v) {
-    // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
-    // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
-    ImmutableMap<String, Type> fieldMap =
-        ImmutableMap.<String, Type>builder()
-            .put("STRING", Type.STRING)
-            .put("INTEGER", Type.LONG)
-            .put("FLOAT", Type.DOUBLE)
-            .put("BOOLEAN", Type.BOOLEAN)
-            .put("TIMESTAMP", Type.LONG)
-            .put("RECORD", Type.RECORD)
-            .build();
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    Type expectedAvroType = fieldMap.get(bqType);
-    verify(
-        avroType == expectedAvroType,
-        "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
-        expectedAvroType,
-        avroType,
-        bqType,
-        fieldSchema.getName());
-    switch (fieldSchema.getType()) {
-      case "STRING":
-        // Avro will use a CharSequence to represent String objects, but it may not always use
-        // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
-        return v.toString();
-      case "INTEGER":
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        return ((Long) v).toString();
-      case "FLOAT":
-        verify(v instanceof Double, "Expected Double, got %s", v.getClass());
-        return v;
-      case "BOOLEAN":
-        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
-        return v;
-      case "TIMESTAMP":
-        // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
-        // Strings with variable-precision (up to six digits) to match the JSON files export
-        // by BigQuery.
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        Double doubleValue = ((Long) v) / 1000000.0;
-        return formatTimestamp(doubleValue.toString());
-      case "RECORD":
-        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
-        return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unexpected BigQuery field schema type %s for field named %s",
-                fieldSchema.getType(),
-                fieldSchema.getName()));
-    }
-  }
-
-  @Nullable
-  private static Object convertNullableField(
-      Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
-    // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
-    verify(
-        avroSchema.getType() == Type.UNION,
-        "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
-        avroSchema.getType(),
-        fieldSchema.getName());
-    List<Schema> unionTypes = avroSchema.getTypes();
-    verify(
-        unionTypes.size() == 2,
-        "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
-        fieldSchema.getName(),
-        unionTypes);
-
-    if (v == null) {
-      return null;
-    }
-
-    Type firstType = unionTypes.get(0).getType();
-    if (!firstType.equals(Type.NULL)) {
-      return convertRequiredField(firstType, fieldSchema, v);
-    }
-    return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
deleted file mode 100644
index 6a0ccf3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link StepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String, StateSampler)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>
- * @Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * </pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String, StateSampler)},
- * {@link #getOrCreateStepContext(String, String, StateSampler}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
-    implements ExecutionContext {
-
-  private Map<String, T> cachedStepContexts = new HashMap<>();
-
-  /**
-   * Implementations should override this to create the specific type
-   * of {@link StepContext} they need.
-   */
-  protected abstract T createStepContext(
-      String stepName, String transformName, StateSampler stateSampler);
-
-
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  @Override
-  public T getOrCreateStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
-    T context = cachedStepContexts.get(stepName);
-    if (context == null) {
-      context = createStepContext(stepName, transformName, stateSampler);
-      cachedStepContexts.put(stepName, context);
-    }
-    return context;
-  }
-
-  /**
-   * Returns a collection view of all of the {@link StepContext}s.
-   */
-  @Override
-  public Collection<? extends T> getAllStepContexts() {
-    return Collections.unmodifiableCollection(cachedStepContexts.values());
-  }
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
-   * is called.
-   */
-  @Override
-  public void noteOutput(WindowedValue<?> output) {}
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
-   * is called.
-   */
-  @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
-  /**
-   * Base class for implementations of {@link ExecutionContext.StepContext}.
-   *
-   * <p>To complete a concrete subclass, implement {@link #timerInternals} and
-   * {@link #stateInternals}.
-   */
-  public abstract static class StepContext implements ExecutionContext.StepContext {
-    private final ExecutionContext executionContext;
-    private final String stepName;
-    private final String transformName;
-
-    public StepContext(ExecutionContext executionContext, String stepName, String transformName) {
-      this.executionContext = executionContext;
-      this.stepName = stepName;
-      this.transformName = transformName;
-    }
-
-    @Override
-    public String getStepName() {
-      return stepName;
-    }
-
-    @Override
-    public String getTransformName() {
-      return transformName;
-    }
-
-    @Override
-    public void noteOutput(WindowedValue<?> output) {
-      executionContext.noteOutput(output);
-    }
-
-    @Override
-    public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
-      executionContext.noteSideOutput(tag, output);
-    }
-
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window, Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Not implemented.");
-    }
-
-    @Override
-    public abstract StateInternals<?> stateInternals();
-
-    @Override
-    public abstract TimerInternals timerInternals();
-  }
-}