You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:29 UTC

[05/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
deleted file mode 100644
index 00d3b3b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A StateSampler object may be used to obtain an approximate
- * breakdown of the time spent by an execution context in various
- * states, as a fraction of the total time.  The sampling is taken at
- * regular intervals, with adjustment for scheduling delay.
- */
-@ThreadSafe
-public class StateSampler implements AutoCloseable {
-
-  /** Different kinds of states. */
-  public enum StateKind {
-    /** IO, user code, etc. */
-    USER,
-    /** Reading/writing from/to shuffle service, etc. */
-    FRAMEWORK
-  }
-
-  public static final long DEFAULT_SAMPLING_PERIOD_MS = 200;
-
-  private final String prefix;
-  private final CounterSet.AddCounterMutator counterSetMutator;
-
-  /** Array of counters indexed by their state. */
-  private ArrayList<Counter<Long>> countersByState = new ArrayList<>();
-
-  /** Map of state name to state. */
-  private Map<String, Integer> statesByName = new HashMap<>();
-
-  /** Map of state id to kind. */
-  private Map<Integer, StateKind> kindsByState = new HashMap<>();
-
-  /** The current state. */
-  private volatile int currentState;
-
-  /** Special value of {@code currentState} that means we do not sample. */
-  public static final int DO_NOT_SAMPLE = -1;
-
-  /**
-   * A counter that increments with each state transition. May be used
-   * to detect a context being stuck in a state for some amount of
-   * time.
-   */
-  private volatile long stateTransitionCount;
-
-  /**
-   * The timestamp (in nanoseconds) corresponding to the last time the
-   * state was sampled (and recorded).
-   */
-  private long stateTimestampNs = 0;
-
-  /** Using a fixed number of timers for all StateSampler objects. */
-  private static final int NUM_EXECUTOR_THREADS = 16;
-
-  private static final ScheduledExecutorService executorService =
-      Executors.newScheduledThreadPool(NUM_EXECUTOR_THREADS,
-          new ThreadFactoryBuilder().setDaemon(true).build());
-
-  private Random rand = new Random();
-
-  private List<SamplingCallback> callbacks = new ArrayList<>();
-
-  private ScheduledFuture<?> invocationTriggerFuture = null;
-
-  private ScheduledFuture<?> invocationFuture = null;
-
-  /**
-   * Constructs a new {@link StateSampler} that can be used to obtain
-   * an approximate breakdown of the time spent by an execution
-   * context in various states, as a fraction of the total time.
-   *
-   * @param prefix the prefix of the counter names for the states
-   * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
-   * used to create a counter for each distinct state
-   * @param samplingPeriodMs the sampling period in milliseconds
-   */
-  public StateSampler(String prefix,
-                      CounterSet.AddCounterMutator counterSetMutator,
-                      final long samplingPeriodMs) {
-    this.prefix = prefix;
-    this.counterSetMutator = counterSetMutator;
-    currentState = DO_NOT_SAMPLE;
-    scheduleSampling(samplingPeriodMs);
-  }
-
-  /**
-   * Constructs a new {@link StateSampler} that can be used to obtain
-   * an approximate breakdown of the time spent by an execution
-   * context in various states, as a fraction of the total time.
-   *
-   * @param prefix the prefix of the counter names for the states
-   * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
-   * used to create a counter for each distinct state
-   */
-  public StateSampler(String prefix,
-                      CounterSet.AddCounterMutator counterSetMutator) {
-    this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS);
-  }
-
-  /**
-   * Called by the constructor to schedule sampling at the given period.
-   *
-   * <p>Should not be overridden by sub-classes unless they want to change
-   * or disable the automatic sampling of state.
-   */
-  protected void scheduleSampling(final long samplingPeriodMs) {
-    // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen sampled
-    // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case some
-    // states happen to occur at a similar period.
-    // The current implementation uses a fixed-rate timer with a period samplingPeriodMs as a
-    // trampoline to a one-shot random timer which fires with a random delay within
-    // samplingPeriodMs.
-    stateTimestampNs = System.nanoTime();
-    invocationTriggerFuture =
-        executorService.scheduleAtFixedRate(
-            new Runnable() {
-              @Override
-              public void run() {
-                long delay = rand.nextInt((int) samplingPeriodMs);
-                synchronized (StateSampler.this) {
-                  if (invocationFuture != null) {
-                    invocationFuture.cancel(false);
-                  }
-                  invocationFuture =
-                      executorService.schedule(
-                          new Runnable() {
-                            @Override
-                            public void run() {
-                              StateSampler.this.run();
-                            }
-                          },
-                          delay,
-                          TimeUnit.MILLISECONDS);
-                }
-              }
-            },
-            0,
-            samplingPeriodMs,
-            TimeUnit.MILLISECONDS);
-  }
-
-  public synchronized void run() {
-    long startTimestampNs = System.nanoTime();
-    int state = currentState;
-    if (state != DO_NOT_SAMPLE) {
-      StateKind kind = null;
-      long elapsedMs = TimeUnit.NANOSECONDS.toMillis(startTimestampNs - stateTimestampNs);
-      kind = kindsByState.get(state);
-      countersByState.get(state).addValue(elapsedMs);
-      // Invoke all callbacks.
-      for (SamplingCallback c : callbacks) {
-        c.run(state, kind, elapsedMs);
-      }
-    }
-    stateTimestampNs = startTimestampNs;
-  }
-
-  @Override
-  public synchronized void close() {
-    currentState = DO_NOT_SAMPLE;
-    if (invocationTriggerFuture != null) {
-      invocationTriggerFuture.cancel(false);
-    }
-    if (invocationFuture != null) {
-      invocationFuture.cancel(false);
-    }
-  }
-
-  /**
-   * Returns the state associated with a name; creating a new state if
-   * necessary. Using states instead of state names during state
-   * transitions is done for efficiency.
-   *
-   * @name the name for the state
-   * @kind kind of the state, see {#code StateKind}
-   * @return the state associated with the state name
-   */
-  public int stateForName(String name, StateKind kind) {
-    if (name.isEmpty()) {
-      return DO_NOT_SAMPLE;
-    }
-
-    synchronized (this) {
-      Integer state = statesByName.get(name);
-      if (state == null) {
-        String counterName = prefix + name + "-msecs";
-        Counter<Long> counter = counterSetMutator.addCounter(
-            Counter.longs(counterName, Counter.AggregationKind.SUM));
-        state = countersByState.size();
-        statesByName.put(name, state);
-        countersByState.add(counter);
-        kindsByState.put(state, kind);
-      }
-      StateKind originalKind = kindsByState.get(state);
-      if (originalKind != kind) {
-        throw new IllegalArgumentException(
-            "for state named " + name
-            + ", requested kind " + kind + " different from the original kind " + originalKind);
-      }
-      return state;
-    }
-  }
-
-  /**
-   * An internal class for representing StateSampler information
-   * typically used for debugging.
-   */
-  public static class StateSamplerInfo {
-    public final String state;
-    public final Long transitionCount;
-    public final Long stateDurationMillis;
-
-    public StateSamplerInfo(String state, Long transitionCount,
-                            Long stateDurationMillis) {
-      this.state = state;
-      this.transitionCount = transitionCount;
-      this.stateDurationMillis = stateDurationMillis;
-    }
-  }
-
-  /**
-   * Returns information about the current state of this state sampler
-   * into a {@link StateSamplerInfo} object, or null if sampling is
-   * not turned on.
-   *
-   * @return information about this state sampler or null if sampling is off
-   */
-  public synchronized StateSamplerInfo getInfo() {
-    return currentState == DO_NOT_SAMPLE ? null
-        : new StateSamplerInfo(countersByState.get(currentState).getName(),
-            stateTransitionCount, null);
-  }
-
-  /**
-   * Returns the current state of this state sampler.
-   */
-  public int getCurrentState() {
-    return currentState;
-  }
-
-  /**
-   * Sets the current thread state.
-   *
-   * @param state the new state to transition to
-   * @return the previous state
-   */
-  public int setState(int state) {
-    // Updates to stateTransitionCount are always done by the same
-    // thread, making the non-atomic volatile update below safe. The
-    // count is updated first to avoid incorrectly attributing
-    // stuckness occuring in an old state to the new state.
-    long previousStateTransitionCount = this.stateTransitionCount;
-    this.stateTransitionCount = previousStateTransitionCount + 1;
-    int previousState = currentState;
-    currentState = state;
-    return previousState;
-  }
-
-  /**
-   * Sets the current thread state.
-   *
-   * @param name the name of the new state to transition to
-   * @param kind kind of the new state
-   * @return the previous state
-   */
-  public int setState(String name, StateKind kind) {
-    return setState(stateForName(name, kind));
-  }
-
-  /**
-   * Returns an AutoCloseable {@link ScopedState} that will perform a
-   * state transition to the given state, and will automatically reset
-   * the state to the prior state upon closing.
-   *
-   * @param state the new state to transition to
-   * @return a {@link ScopedState} that automatically resets the state
-   * to the prior state
-   */
-  public ScopedState scopedState(int state) {
-    return new ScopedState(this, setState(state));
-  }
-
-  /**
-   * Add a callback to the sampler.
-   * The callbacks will be executed sequentially upon {@link StateSampler#run}.
-   */
-  public synchronized void addSamplingCallback(SamplingCallback callback) {
-    callbacks.add(callback);
-  }
-
-  /** Get the counter prefix associated with this sampler. */
-  public String getPrefix() {
-    return prefix;
-  }
-
-  /**
-   * A nested class that is used to account for states and state
-   * transitions based on lexical scopes.
-   *
-   * <p>Thread-safe.
-   */
-  public class ScopedState implements AutoCloseable {
-    private StateSampler sampler;
-    private int previousState;
-
-    private ScopedState(StateSampler sampler, int previousState) {
-      this.sampler = sampler;
-      this.previousState = previousState;
-    }
-
-    @Override
-    public void close() {
-      sampler.setState(previousState);
-    }
-  }
-
-  /**
-   * Callbacks which supposed to be called sequentially upon {@link StateSampler#run}.
-   * They should be registered via {@link #addSamplingCallback}.
-   */
-  public static interface SamplingCallback {
-    /**
-     * The entrance method of the callback, it is called in {@link StateSampler#run},
-     * once per sample. This method should be thread safe.
-     *
-     * @param state The state of the StateSampler at the time of sample.
-     * @param kind The kind associated with the state, see {@link StateKind}.
-     * @param elapsedMs Milliseconds since last sample.
-     */
-    public void run(int state, StateKind kind, long elapsedMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
deleted file mode 100644
index c3da9ed..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used to implement the harness that runs user code. **/
-package com.google.cloud.dataflow.sdk.util.common.worker;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
deleted file mode 100644
index f72ba4c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.gcsfs;
-
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.FileSystem;
-import java.nio.file.LinkOption;
-import java.nio.file.Path;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
- *
- * <p>GcsPath uses a slash ('/') as a directory separator.  Below is
- * a summary of how slashes are treated:
- * <ul>
- *   <li> A GCS bucket may not contain a slash.  An object may contain zero or
- *        more slashes.
- *   <li> A trailing slash always indicates a directory, which is compliant
- *        with POSIX.1-2008.
- *   <li> Slashes separate components of a path.  Empty components are allowed,
- *        these are represented as repeated slashes.  An empty component always
- *        refers to a directory, and always ends in a slash.
- *   <li> {@link #getParent()}} always returns a path ending in a slash, as the
- *        parent of a GcsPath is always a directory.
- *   <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
- *        applies the rules consistently and is highly recommended over any
- *        custom string concatenation.
- * </ul>
- *
- * <p>GcsPath treats all GCS objects and buckets as belonging to the same
- * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
- *
- * <p>Relative paths are not associated with any bucket.  This matches common
- * treatment of Path in which relative paths can be constructed from one
- * filesystem and appended to another filesystem.
- *
- * @see <a href=
- * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
- * >Java Tutorials: Path Operations</a>
- */
-public class GcsPath implements Path {
-
-  public static final String SCHEME = "gs";
-
-  /**
-   * Creates a GcsPath from a URI.
-   *
-   * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
-   * contain a port, user info, a query, or a fragment.
-   */
-  public static GcsPath fromUri(URI uri) {
-    Preconditions.checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME),
-        "URI: %s is not a GCS URI", uri);
-    Preconditions.checkArgument(uri.getPort() == -1,
-        "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
-    Preconditions.checkArgument(
-        Strings.isNullOrEmpty(uri.getUserInfo()),
-        "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
-    Preconditions.checkArgument(
-        Strings.isNullOrEmpty(uri.getQuery()),
-        "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
-    Preconditions.checkArgument(
-        Strings.isNullOrEmpty(uri.getFragment()),
-        "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
-
-    return fromUri(uri.toString());
-  }
-
-  /**
-   * Pattern that is used to parse a GCS URL.
-   *
-   * <p>This is used to separate the components.  Verification is handled
-   * separately.
-   */
-  public static final Pattern GCS_URI =
-      Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
-  /**
-   * Creates a GcsPath from a URI in string form.
-   *
-   * <p>This does not use URI parsing, which means it may accept patterns that
-   * the URI parser would not accept.
-   */
-  public static GcsPath fromUri(String uri) {
-    Matcher m = GCS_URI.matcher(uri);
-    Preconditions.checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
-
-    Preconditions.checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
-        "URI: %s is not a GCS URI", uri);
-    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
-  }
-
-  /**
-   * Pattern that is used to parse a GCS resource name.
-   */
-  private static final Pattern GCS_RESOURCE_NAME =
-      Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
-  /**
-   * Creates a GcsPath from a OnePlatform resource name in string form.
-   */
-  public static GcsPath fromResourceName(String name) {
-    Matcher m = GCS_RESOURCE_NAME.matcher(name);
-    Preconditions.checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
-
-    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
-  }
-
-  /**
-   * Creates a GcsPath from a {@linkplain StorageObject}.
-   */
-  public static GcsPath fromObject(StorageObject object) {
-    return new GcsPath(null, object.getBucket(), object.getName());
-  }
-
-  /**
-   * Creates a GcsPath from bucket and object components.
-   *
-   * <p>A GcsPath without a bucket name is treated as a relative path, which
-   * is a path component with no linkage to the root element.  This is similar
-   * to a Unix path that does not begin with the root marker (a slash).
-   * GCS has different naming constraints and APIs for working with buckets and
-   * objects, so these two concepts are kept separate to avoid accidental
-   * attempts to treat objects as buckets, or vice versa, as much as possible.
-   *
-   * <p>A GcsPath without an object name is a bucket reference.
-   * A bucket is always a directory, which could be used to lookup or add
-   * files to a bucket, but could not be opened as a file.
-   *
-   * <p>A GcsPath containing neither bucket or object names is treated as
-   * the root of the GCS filesystem.  A listing on the root element would return
-   * the buckets available to the user.
-   *
-   * <p>If {@code null} is passed as either parameter, it is converted to an
-   * empty string internally for consistency.  There is no distinction between
-   * an empty string and a {@code null}, as neither are allowed by GCS.
-   *
-   * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
-   *               if the object is not associated with a bucket
-   *               (e.g. relative paths or the root node).
-   * @param object a GCS object path, or none ({@code null} or an empty string)
-   *               for no object.
-   */
-  public static GcsPath fromComponents(@Nullable String bucket,
-                                       @Nullable String object) {
-    return new GcsPath(null, bucket, object);
-  }
-
-  @Nullable
-  private FileSystem fs;
-  @Nonnull
-  private final String bucket;
-  @Nonnull
-  private final String object;
-
-  /**
-   * Constructs a GcsPath.
-   *
-   * @param fs the associated FileSystem, if any
-   * @param bucket the associated bucket, or none ({@code null} or an empty
-   *               string) for a relative path component
-   * @param object the object, which is a fully-qualified object name if bucket
-   *               was also provided, or none ({@code null} or an empty string)
-   *               for no object
-   * @throws java.lang.IllegalArgumentException if the bucket of object names
-   *         are invalid.
-   */
-  public GcsPath(@Nullable FileSystem fs,
-                 @Nullable String bucket,
-                 @Nullable String object) {
-    if (bucket == null) {
-      bucket = "";
-    }
-    Preconditions.checkArgument(!bucket.contains("/"),
-        "GCS bucket may not contain a slash");
-    Preconditions
-        .checkArgument(bucket.isEmpty()
-                || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
-            "GCS bucket names must contain only lowercase letters, numbers, "
-                + "dashes (-), underscores (_), and dots (.). Bucket names "
-                + "must start and end with a number or letter. "
-                + "See https://developers.google.com/storage/docs/bucketnaming "
-                + "for more details.  Bucket name: " + bucket);
-
-    if (object == null) {
-      object = "";
-    }
-    Preconditions.checkArgument(
-        object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
-        "GCS object names must not contain Carriage Return or "
-            + "Line Feed characters.");
-
-    this.fs = fs;
-    this.bucket = bucket;
-    this.object = object;
-  }
-
-  /**
-   * Returns the bucket name associated with this GCS path, or an empty string
-   * if this is a relative path component.
-   */
-  public String getBucket() {
-    return bucket;
-  }
-
-  /**
-   * Returns the object name associated with this GCS path, or an empty string
-   * if no object is specified.
-   */
-  public String getObject() {
-    return object;
-  }
-
-  public void setFileSystem(FileSystem fs) {
-    this.fs = fs;
-  }
-
-  @Override
-  public FileSystem getFileSystem() {
-    return fs;
-  }
-
-  // Absolute paths are those that have a bucket and the root path.
-  @Override
-  public boolean isAbsolute() {
-    return !bucket.isEmpty() || object.isEmpty();
-  }
-
-  @Override
-  public GcsPath getRoot() {
-    return new GcsPath(fs, "", "");
-  }
-
-  @Override
-  public GcsPath getFileName() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Returns the <em>parent path</em>, or {@code null} if this path does not
-   * have a parent.
-   *
-   * <p>Returns a path that ends in '/', as the parent path always refers to
-   * a directory.
-   */
-  @Override
-  public GcsPath getParent() {
-    if (bucket.isEmpty() && object.isEmpty()) {
-      // The root path has no parent, by definition.
-      return null;
-    }
-
-    if (object.isEmpty()) {
-      // A GCS bucket. All buckets come from a common root.
-      return getRoot();
-    }
-
-    // Skip last character, in case it is a trailing slash.
-    int i = object.lastIndexOf('/', object.length() - 2);
-    if (i <= 0) {
-      if (bucket.isEmpty()) {
-        // Relative paths are not attached to the root node.
-        return null;
-      }
-      return new GcsPath(fs, bucket, "");
-    }
-
-    // Retain trailing slash.
-    return new GcsPath(fs, bucket, object.substring(0, i + 1));
-  }
-
-  @Override
-  public int getNameCount() {
-    int count = bucket.isEmpty() ? 0 : 1;
-    if (object.isEmpty()) {
-      return count;
-    }
-
-    // Add another for each separator found.
-    int index = -1;
-    while ((index = object.indexOf('/', index + 1)) != -1) {
-      count++;
-    }
-
-    return object.endsWith("/") ? count : count + 1;
-  }
-
-  @Override
-  public GcsPath getName(int count) {
-    Preconditions.checkArgument(count >= 0);
-
-    Iterator<Path> iterator = iterator();
-    for (int i = 0; i < count; ++i) {
-      Preconditions.checkArgument(iterator.hasNext());
-      iterator.next();
-    }
-
-    Preconditions.checkArgument(iterator.hasNext());
-    return (GcsPath) iterator.next();
-  }
-
-  @Override
-  public GcsPath subpath(int beginIndex, int endIndex) {
-    Preconditions.checkArgument(beginIndex >= 0);
-    Preconditions.checkArgument(endIndex > beginIndex);
-
-    Iterator<Path> iterator = iterator();
-    for (int i = 0; i < beginIndex; ++i) {
-      Preconditions.checkArgument(iterator.hasNext());
-      iterator.next();
-    }
-
-    GcsPath path = null;
-    while (beginIndex < endIndex) {
-      Preconditions.checkArgument(iterator.hasNext());
-      if (path == null) {
-        path = (GcsPath) iterator.next();
-      } else {
-        path = path.resolve(iterator.next());
-      }
-      ++beginIndex;
-    }
-
-    return path;
-  }
-
-  @Override
-  public boolean startsWith(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath gcsPath = (GcsPath) other;
-      return startsWith(gcsPath.bucketAndObject());
-    } else {
-      return startsWith(other.toString());
-    }
-  }
-
-  @Override
-  public boolean startsWith(String prefix) {
-    return bucketAndObject().startsWith(prefix);
-  }
-
-  @Override
-  public boolean endsWith(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath gcsPath = (GcsPath) other;
-      return endsWith(gcsPath.bucketAndObject());
-    } else {
-      return endsWith(other.toString());
-    }
-  }
-
-  @Override
-  public boolean endsWith(String suffix) {
-    return bucketAndObject().endsWith(suffix);
-  }
-
-  // TODO: support "." and ".." path components?
-  @Override
-  public GcsPath normalize() {
-    return this;
-  }
-
-  @Override
-  public GcsPath resolve(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath path = (GcsPath) other;
-      if (path.isAbsolute()) {
-        return path;
-      } else {
-        return resolve(path.getObject());
-      }
-    } else {
-      return resolve(other.toString());
-    }
-  }
-
-  @Override
-  public GcsPath resolve(String other) {
-    if (bucket.isEmpty() && object.isEmpty()) {
-      // Resolve on a root path is equivalent to looking up a bucket and object.
-      other = SCHEME + "://" + other;
-    }
-
-    if (other.startsWith(SCHEME + "://")) {
-      GcsPath path = GcsPath.fromUri(other);
-      path.setFileSystem(getFileSystem());
-      return path;
-    }
-
-    if (other.isEmpty()) {
-      // An empty component MUST refer to a directory.
-      other = "/";
-    }
-
-    if (object.isEmpty()) {
-      return new GcsPath(fs, bucket, other);
-    } else if (object.endsWith("/")) {
-      return new GcsPath(fs, bucket, object + other);
-    } else {
-      return new GcsPath(fs, bucket, object + "/" + other);
-    }
-  }
-
-  @Override
-  public Path resolveSibling(Path other) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Path resolveSibling(String other) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Path relativize(Path other) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public GcsPath toAbsolutePath() {
-    return this;
-  }
-
-  @Override
-  public GcsPath toRealPath(LinkOption... options) throws IOException {
-    return this;
-  }
-
-  @Override
-  public File toFile() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
-      WatchEvent.Modifier... modifiers) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
-      throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Iterator<Path> iterator() {
-    return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
-  }
-
-  private static class NameIterator implements Iterator<Path> {
-    private final FileSystem fs;
-    private boolean fullPath;
-    private String name;
-
-    NameIterator(FileSystem fs, boolean fullPath, String name) {
-      this.fs = fs;
-      this.fullPath = fullPath;
-      this.name = name;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return !Strings.isNullOrEmpty(name);
-    }
-
-    @Override
-    public GcsPath next() {
-      int i = name.indexOf('/');
-      String component;
-      if (i >= 0) {
-        component = name.substring(0, i);
-        name = name.substring(i + 1);
-      } else {
-        component = name;
-        name = null;
-      }
-      if (fullPath) {
-        fullPath = false;
-        return new GcsPath(fs, component, "");
-      } else {
-        // Relative paths have no bucket.
-        return new GcsPath(fs, "", component);
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  @Override
-  public int compareTo(Path other) {
-    if (!(other instanceof GcsPath)) {
-      throw new ClassCastException();
-    }
-
-    GcsPath path = (GcsPath) other;
-    int b = bucket.compareTo(path.bucket);
-    if (b != 0) {
-      return b;
-    }
-
-    // Compare a component at a time, so that the separator char doesn't
-    // get compared against component contents.  Eg, "a/b" < "a-1/b".
-    Iterator<Path> left = iterator();
-    Iterator<Path> right = path.iterator();
-
-    while (left.hasNext() && right.hasNext()) {
-      String leftStr = left.next().toString();
-      String rightStr = right.next().toString();
-      int c = leftStr.compareTo(rightStr);
-      if (c != 0) {
-        return c;
-      }
-    }
-
-    if (!left.hasNext() && !right.hasNext()) {
-      return 0;
-    } else {
-      return left.hasNext() ? 1 : -1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    GcsPath paths = (GcsPath) o;
-    return bucket.equals(paths.bucket) && object.equals(paths.object);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = bucket.hashCode();
-    result = 31 * result + object.hashCode();
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    if (!isAbsolute()) {
-      return object;
-    }
-    StringBuilder sb = new StringBuilder();
-    sb.append(SCHEME)
-        .append("://");
-    if (!bucket.isEmpty()) {
-      sb.append(bucket)
-          .append('/');
-    }
-    sb.append(object);
-    return sb.toString();
-  }
-
-  // TODO: Consider using resource names for all GCS paths used by the SDK.
-  public String toResourceName() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("storage.googleapis.com/");
-    if (!bucket.isEmpty()) {
-      sb.append(bucket).append('/');
-    }
-    sb.append(object);
-    return sb.toString();
-  }
-
-  @Override
-  public URI toUri() {
-    try {
-      return new URI(SCHEME, "//" + bucketAndObject(), null);
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Unable to create URI for GCS path " + this);
-    }
-  }
-
-  private String bucketAndObject() {
-    if (bucket.isEmpty()) {
-      return object;
-    } else {
-      return bucket + "/" + object;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
deleted file mode 100644
index 2f57938..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used to interact with Google Cloud Storage. **/
-package com.google.cloud.dataflow.sdk.util.gcsfs;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
deleted file mode 100644
index c92adab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used by the Dataflow SDK. **/
-package com.google.cloud.dataflow.sdk.util;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 0d78b13..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
-    extends CombiningState<InputT, OutputT> {
-
-  /**
-   * Read the merged accumulator for this combining value. It is implied that reading the
-   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
-   * this.
-   */
-  AccumT getAccum();
-
-  /**
-   * Add an accumulator to this combining value. Depending on implementation this may immediately
-   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
-   */
-  void addAccum(AccumT accum);
-
-  /**
-   * Merge the given accumulators according to the underlying combiner.
-   */
-  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
-  @Override
-  AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
deleted file mode 100644
index 363e480..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-/**
- * State containing a bag values. Items can be added to the bag and the contents read out.
- *
- * @param <T> The type of elements in the bag.
- */
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
-  @Override
-  BagState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
deleted file mode 100644
index 673bebb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
-  /**
-   * Add a value to the buffer.
-   */
-  void add(InputT value);
-
-  /**
-   * Return true if this state is empty.
-   */
-  ReadableState<Boolean> isEmpty();
-
-  @Override
-  CombiningState<InputT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
deleted file mode 100644
index 3683b74..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.CombineFnUtil;
-import com.google.cloud.dataflow.sdk.util.state.InMemoryStateInternals.InMemoryState;
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link StateInternals} built on top of an underlying {@link StateTable} that contains instances
- * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
- * accessed, an independent copy will be created within this table.
- */
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
-  private final K key;
-  private final CopyOnAccessInMemoryStateTable<K> table;
-
-  /**
-   * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
-   * StateInternals.
-   */
-  public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
-      K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
-    return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
-  }
-
-  private CopyOnAccessInMemoryStateInternals(
-      K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
-    this.key = key;
-    table =
-        new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
-  }
-
-  /**
-   * Ensures this {@link CopyOnAccessInMemoryStateInternals} is complete. Other copies of state for
-   * the same Step and Key may be discarded after invoking this method.
-   *
-   * <p>For each {@link StateNamespace}, for each {@link StateTag address} in that namespace that
-   * has not been bound in this {@link CopyOnAccessInMemoryStateInternals}, put a reference to that
-   * state within this {@link StateInternals}.
-   *
-   * <p>Additionally, stores the {@link WatermarkHoldState} with the earliest time bound in the
-   * state table after the commit is completed, enabling calls to
-   * {@link #getEarliestWatermarkHold()}.
-   *
-   * @return this table
-   */
-  public CopyOnAccessInMemoryStateInternals<K> commit() {
-    table.commit();
-    return this;
-  }
-
-  /**
-   * Gets the earliest Watermark Hold present in this table.
-   *
-   * <p>Must be called after this state has been committed. Will throw an
-   * {@link IllegalStateException} if the state has not been committed.
-   */
-  public Instant getEarliestWatermarkHold() {
-    // After commit, the watermark hold is always present, but may be
-    // BoundedWindow#TIMESTAMP_MAX_VALUE if there is no hold set.
-    checkState(
-        table.earliestWatermarkHold.isPresent(),
-        "Can't get the earliest watermark hold in a %s before it is committed",
-        getClass().getSimpleName());
-    return table.earliestWatermarkHold.get();
-  }
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
-    return table.get(namespace, address, c);
-  }
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  public boolean isEmpty() {
-    return Iterables.isEmpty(table.values());
-  }
-
-  /**
-   * A {@link StateTable} that, when a value is retrieved with
-   * {@link StateTable#get(StateNamespace, StateTag)}, first attempts to obtain a copy of existing
-   * {@link State} from an underlying {@link StateTable}.
-   */
-  private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
-    private final K key;
-    private Optional<StateTable<K>> underlying;
-
-    /**
-     * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
-     *
-     * <p>There are three {@link StateBinderFactory} implementations used by the {@link
-     * CopyOnAccessInMemoryStateTable}.
-     * <ul>
-     *   <li>The default {@link StateBinderFactory} is a {@link CopyOnBindBinderFactory}, allowing
-     *       the table to copy any existing {@link State} values to this {@link StateTable} from the
-     *       underlying table when accessed, at which point mutations will not be visible to the
-     *       underlying table - effectively a "Copy by Value" binder.</li>
-     *   <li>During the execution of the {@link #commit()} method, this is a
-     *       {@link ReadThroughBinderFactory}, which copies the references to the existing
-     *       {@link State} objects to this {@link StateTable}.</li>
-     *   <li>After the execution of the {@link #commit()} method, this is an
-     *       instance of {@link InMemoryStateBinderFactory}, which constructs new instances of state
-     *       when a {@link StateTag} is bound.</li>
-     * </ul>
-     */
-    private StateBinderFactory<K> binderFactory;
-
-    /**
-     * The earliest watermark hold in this table.
-     */
-    private Optional<Instant> earliestWatermarkHold;
-
-    public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
-      this.key = key;
-      this.underlying = Optional.fromNullable(underlying);
-      binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
-      earliestWatermarkHold = Optional.absent();
-    }
-
-    /**
-     * Copies all values in the underlying table to this table, then discards the underlying table.
-     *
-     * <p>If there is an underlying table, this replaces the existing
-     * {@link CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of
-     * the values in the existing table, binding the state values to this table. The old StateTable
-     * should be discarded after the call to {@link #commit()}.
-     *
-     * <p>After copying all of the existing values, replace the binder factory with an instance of
-     * {@link InMemoryStateBinderFactory} to construct new values, since all existing values
-     * are bound in this {@link StateTable table} and this table represents the canonical state.
-     */
-    private void commit() {
-      Instant earliestHold = getEarliestWatermarkHold();
-      if (underlying.isPresent()) {
-        ReadThroughBinderFactory<K> readThroughBinder =
-            new ReadThroughBinderFactory<>(underlying.get());
-        binderFactory = readThroughBinder;
-        Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
-        if (earliestUnderlyingHold.isBefore(earliestHold)) {
-          earliestHold = earliestUnderlyingHold;
-        }
-      }
-      earliestWatermarkHold = Optional.of(earliestHold);
-      clearEmpty();
-      binderFactory = new InMemoryStateBinderFactory<>(key);
-      underlying = Optional.absent();
-    }
-
-    /**
-     * Get the earliest watermark hold in this table. Ignores the contents of any underlying table.
-     */
-    private Instant getEarliestWatermarkHold() {
-      Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (State existingState : this.values()) {
-        if (existingState instanceof WatermarkHoldState) {
-          Instant hold = ((WatermarkHoldState<?>) existingState).read();
-          if (hold != null && hold.isBefore(earliest)) {
-            earliest = hold;
-          }
-        }
-      }
-      return earliest;
-    }
-
-    /**
-     * Clear all empty {@link StateNamespace StateNamespaces} from this table. If all states are
-     * empty, clear the entire table.
-     *
-     * <p>Because {@link InMemoryState} is not removed from the {@link StateTable} after it is
-     * cleared, in case contents are modified after being cleared, the table must be explicitly
-     * checked to ensure that it contains state and removed if not (otherwise we may never use
-     * the table again).
-     */
-    private void clearEmpty() {
-      Collection<StateNamespace> emptyNamespaces = new HashSet<>(this.getNamespacesInUse());
-      for (StateNamespace namespace : this.getNamespacesInUse()) {
-        for (State existingState : this.getTagsInUse(namespace).values()) {
-          if (!((InMemoryState<?>) existingState).isCleared()) {
-            emptyNamespaces.remove(namespace);
-            break;
-          }
-        }
-      }
-      for (StateNamespace empty : emptyNamespaces) {
-        this.clearNamespace(empty);
-      }
-    }
-
-    @Override
-    protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
-      return binderFactory.forNamespace(namespace, c);
-    }
-
-    private static interface StateBinderFactory<K> {
-      StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
-    }
-
-    /**
-     * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
-     */
-    private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
-      private final Optional<StateTable<K>> underlying;
-
-      public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
-        this.key = key;
-        this.underlying = underlying;
-      }
-
-      private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
-        return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
-            && underlying.get().getTagsInUse(namespace).containsKey(tag);
-      }
-
-      @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
-          @Override
-          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-              StateTag<? super K, WatermarkHoldState<W>> address,
-              OutputTimeFn<? super W> outputTimeFn) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends WatermarkHoldState<W>> existingState =
-                  (InMemoryStateInternals.InMemoryState<? extends WatermarkHoldState<W>>)
-                  underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryStateInternals.InMemoryWatermarkHold<>(
-                  outputTimeFn);
-            }
-          }
-
-          @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends ValueState<T>> existingState =
-                  (InMemoryStateInternals.InMemoryState<? extends ValueState<T>>)
-                  underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryStateInternals.InMemoryValue<>();
-            }
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-              bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
-                  existingState = (
-                      InMemoryStateInternals
-                          .InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
-                          OutputT>>) underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryStateInternals.InMemoryCombiningValue<>(
-                  key, combineFn.asKeyedFn());
-            }
-          }
-
-          @Override
-          public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends BagState<T>> existingState =
-                  (InMemoryStateInternals.InMemoryState<? extends BagState<T>>)
-                  underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryStateInternals.InMemoryBag<>();
-            }
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-              bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
-                  existingState = (
-                      InMemoryStateInternals
-                          .InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
-                          OutputT>>) underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn);
-            }
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-          bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-            return bindKeyedCombiningValue(
-                address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
-          }
-        };
-      }
-    }
-
-    /**
-     * {@link StateBinderFactory} that reads directly from the underlying table. Used during calls
-     * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
-     * the underlying table.
-     */
-    private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
-      private final StateTable<K> underlying;
-
-      public ReadThroughBinderFactory(StateTable<K> underlying) {
-        this.underlying = underlying;
-      }
-
-      public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
-        Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-        for (StateNamespace namespace : underlying.getNamespacesInUse()) {
-          for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
-              underlying.getTagsInUse(namespace).entrySet()) {
-            if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
-              // Only read through non-cleared values to ensure that completed windows are
-              // eventually discarded, and remember the earliest watermark hold from among those
-              // values.
-              State state =
-                  readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
-              if (state instanceof WatermarkHoldState) {
-                Instant hold = ((WatermarkHoldState<?>) state).read();
-                if (hold != null && hold.isBefore(earliestHold)) {
-                  earliestHold = hold;
-                }
-              }
-            }
-          }
-        }
-        return earliestHold;
-      }
-
-      @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
-          @Override
-          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-              StateTag<? super K, WatermarkHoldState<W>> address,
-              OutputTimeFn<? super W> outputTimeFn) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-              bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-              bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-          bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-            return bindKeyedCombiningValue(
-                address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
-          }
-        };
-      }
-    }
-
-    private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
-
-      public InMemoryStateBinderFactory(K key) {
-        this.key = key;
-      }
-
-      @Override
-      public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
-        return new InMemoryStateInternals.InMemoryStateBinder<>(key, c);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
deleted file mode 100644
index 8404801..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.CombineFnUtil;
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * In-memory implementation of {@link StateInternals}. Used in {@code BatchModeExecutionContext}
- * and for running tests that need state.
- */
-@Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
-
-  public static <K> InMemoryStateInternals<K> forKey(K key) {
-    return new InMemoryStateInternals<>(key);
-  }
-
-  private final K key;
-
-  protected InMemoryStateInternals(K key) {
-    this.key = key;
-  }
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  interface InMemoryState<T extends InMemoryState<T>> {
-    boolean isCleared();
-    T copy();
-  }
-
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
-    @Override
-    protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) {
-      return new InMemoryStateBinder<K>(key, c);
-    }
-  };
-
-  public void clear() {
-    inMemoryState.clear();
-  }
-
-  /**
-   * Return true if the given state is empty. This is used by the test framework to make sure
-   * that the state has been properly cleaned up.
-   */
-  protected boolean isEmptyForTesting(State state) {
-    return ((InMemoryState<?>) state).isCleared();
-  }
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
-    return inMemoryState.get(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
-    return inMemoryState.get(namespace, address, c);
-  }
-
-  /**
-   * A {@link StateBinder} that returns In Memory {@link State} objects.
-   */
-  static class InMemoryStateBinder<K> implements StateBinder<K> {
-    private final K key;
-    private final StateContext<?> c;
-
-    InMemoryStateBinder(K key, StateContext<?> c) {
-      this.key = key;
-      this.c = c;
-    }
-
-    @Override
-    public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-      return new InMemoryValue<T>();
-    }
-
-    @Override
-    public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-      return new InMemoryBag<T>();
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
-    }
-
-    @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new InMemoryWatermarkHold<W>(outputTimeFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-      return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
-    }
-  }
-
-  static final class InMemoryValue<T> implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
-    private boolean isCleared = true;
-    private T value = null;
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory state map, since
-      // other users may already have a handle on this Value.
-      value = null;
-      isCleared = true;
-    }
-
-    @Override
-    public InMemoryValue<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public void write(T input) {
-      isCleared = false;
-      this.value = input;
-    }
-
-    @Override
-    public InMemoryValue<T> copy() {
-      InMemoryValue<T> that = new InMemoryValue<>();
-      if (!this.isCleared) {
-        that.isCleared = this.isCleared;
-        that.value = this.value;
-      }
-      return that;
-    }
-
-    @Override
-    public boolean isCleared() {
-      return isCleared;
-    }
-  }
-
-  static final class InMemoryWatermarkHold<W extends BoundedWindow>
-      implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
-
-    private final OutputTimeFn<? super W> outputTimeFn;
-
-    @Nullable
-    private Instant combinedHold = null;
-
-    public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public InMemoryWatermarkHold<W> readLater() {
-      return this;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory state map, since
-      // other users may already have a handle on this WatermarkBagInternal.
-      combinedHold = null;
-    }
-
-    @Override
-    public Instant read() {
-      return combinedHold;
-    }
-
-    @Override
-    public void add(Instant outputTime) {
-      combinedHold = combinedHold == null ? outputTime
-          : outputTimeFn.combine(combinedHold, outputTime);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return combinedHold == null;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return combinedHold == null;
-        }
-      };
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toString(combinedHold);
-    }
-
-    @Override
-    public InMemoryWatermarkHold<W> copy() {
-      InMemoryWatermarkHold<W> that =
-          new InMemoryWatermarkHold<>(outputTimeFn);
-      that.combinedHold = this.combinedHold;
-      return that;
-    }
-  }
-
-  static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
-          InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
-    private final K key;
-    private boolean isCleared = true;
-    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-    private AccumT accum;
-
-    InMemoryCombiningValue(
-        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      this.key = key;
-      this.combineFn = combineFn;
-      accum = combineFn.createAccumulator(key);
-    }
-
-    @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory state map, since
-      // other users may already have a handle on this CombiningValue.
-      accum = combineFn.createAccumulator(key);
-      isCleared = true;
-    }
-
-    @Override
-    public OutputT read() {
-      return combineFn.extractOutput(key, accum);
-    }
-
-    @Override
-    public void add(InputT input) {
-      isCleared = false;
-      accum = combineFn.addInput(key, accum, input);
-    }
-
-    @Override
-    public AccumT getAccum() {
-      return accum;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return isCleared;
-        }
-      };
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      isCleared = false;
-      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return isCleared;
-    }
-
-    @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
-      InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
-          new InMemoryCombiningValue<>(key, combineFn);
-      if (!this.isCleared) {
-        that.isCleared = this.isCleared;
-        that.addAccum(accum);
-      }
-      return that;
-    }
-  }
-
-  static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> {
-    private List<T> contents = new ArrayList<>();
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory state map, since
-      // other users may already have a handle on this Bag.
-      // The result of get/read below must be stable for the lifetime of the bundle within which it
-      // was generated. In batch and direct runners the bundle lifetime can be
-      // greater than the window lifetime, in which case this method can be called while
-      // the result is still in use. We protect against this by hot-swapping instead of
-      // clearing the contents.
-      contents = new ArrayList<>();
-    }
-
-    @Override
-    public InMemoryBag<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      return contents;
-    }
-
-    @Override
-    public void add(T input) {
-      contents.add(input);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return contents.isEmpty();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return contents.isEmpty();
-        }
-      };
-    }
-
-    @Override
-    public InMemoryBag<T> copy() {
-      InMemoryBag<T> that = new InMemoryBag<>();
-      that.contents.addAll(this.contents);
-      return that;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
deleted file mode 100644
index 40211d7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import java.util.Map;
-
-/**
- * Interface for accessing persistent state while windows are merging.
- *
- * <p>For internal use only.
- */
-@Experimental(Kind.STATE)
-public interface MergingStateAccessor<K, W extends BoundedWindow>
-    extends StateAccessor<K> {
-  /**
-   * Analogous to {@link #access}, but returned as a map from each window which is
-   * about to be merged to the corresponding state. Only includes windows which
-   * are known to have state.
-   */
-  <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-      StateTag<? super K, StateT> address);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
deleted file mode 100644
index 8f690a3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * A {@code StateContents} is produced by the read methods on all {@link State} objects.
- * Calling {@link #read} returns the associated value.
- *
- * <p>This class is similar to {@link java.util.concurrent.Future}, but each invocation of
- * {@link #read} need not return the same value.
- *
- * <p>Getting the {@code StateContents} from a read method indicates the desire to eventually
- * read a value. Depending on the runner this may or may not immediately start the read.
- *
- * @param <T> The type of value returned by {@link #read}.
- */
-@Experimental(Kind.STATE)
-public interface ReadableState<T> {
-  /**
-   * Read the current value, blocking until it is available.
-   *
-   * <p>If there will be many calls to {@link #read} for different state in short succession,
-   * you should first call {@link #readLater} for all of them so the reads can potentially be
-   * batched (depending on the underlying {@link StateInternals} implementation}.
-   */
-  T read();
-
-  /**
-   * Indicate that the value will be read later.
-   *
-   * <p>This allows a {@link StateInternals} implementation to start an asynchronous prefetch or
-   * to include this state in the next batch of reads.
-   *
-   * @return this for convenient chaining
-   */
-  ReadableState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
deleted file mode 100644
index 0cef786..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-/**
- * Base interface for all state locations.
- *
- * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
- */
-public interface State {
-
-  /**
-   * Clear out the state location.
-   */
-  void clear();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
deleted file mode 100644
index 6cfbecf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * Interface for accessing a {@link StateTag} in the current context.
- *
- * <p>For internal use only.
- */
-@Experimental(Kind.STATE)
-public interface StateAccessor<K> {
-  /**
-   * Access the storage for the given {@code address} in the current window.
-   *
-   * <p>Never accounts for merged windows. When windows are merged, any state accessed via
-   * this method must be eagerly combined and written into the result window.
-   */
-  <StateT extends State> StateT access(StateTag<? super K, StateT> address);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
deleted file mode 100644
index 96387d8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * Information accessible the state API.
- */
-public interface StateContext<W extends BoundedWindow> {
-  /**
-   * Returns the {@code PipelineOptions} specified with the
-   * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}.
-   */
-  public abstract PipelineOptions getPipelineOptions();
-
-  /**
-   * Returns the value of the side input for the corresponding state window.
-   */
-  public abstract <T> T sideInput(PCollectionView<T> view);
-
-  /**
-   * Returns the window corresponding to the state.
-   */
-  public abstract W window();
-}