You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:28 UTC

[04/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
deleted file mode 100644
index e301d43..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory that produces {@link StateContext} based on different inputs.
- */
-public class StateContexts {
-  private static final StateContext<BoundedWindow> NULL_CONTEXT =
-      new StateContext<BoundedWindow>() {
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          throw new IllegalArgumentException("cannot call sideInput() in a null context");
-        }
-
-        @Override
-        public BoundedWindow window() {
-          throw new IllegalArgumentException("cannot call window() in a null context");
-        }};
-
-  /**
-   * Returns a fake {@link StateContext}.
-   */
-  @SuppressWarnings("unchecked")
-  public static <W extends BoundedWindow> StateContext<W> nullContext() {
-    return (StateContext<W>) NULL_CONTEXT;
-  }
-
-  /**
-   * Returns a {@link StateContext} that only contains the state window.
-   */
-  public static <W extends BoundedWindow> StateContext<W> windowOnly(final W window) {
-    return new StateContext<W>() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        throw new IllegalArgumentException(
-            "cannot call getPipelineOptions() in a window only context");
-      }
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        throw new IllegalArgumentException("cannot call sideInput() in a window only context");
-      }
-      @Override
-      public W window() {
-        return window;
-      }
-    };
-  }
-
-  /**
-   * Returns a {@link StateContext} from {@code PipelineOptions}, {@link WindowingInternals},
-   * and the state window.
-   */
-  public static <W extends BoundedWindow> StateContext<W> createFromComponents(
-      @Nullable final PipelineOptions options,
-      final WindowingInternals<?, ?> windowingInternals,
-      final W window) {
-    @SuppressWarnings("unchecked")
-    StateContext<W> typedNullContext = (StateContext<W>) NULL_CONTEXT;
-    if (options == null) {
-      return typedNullContext;
-    } else {
-      return new StateContext<W>() {
-
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          return options;
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          return windowingInternals.sideInput(view, window);
-        }
-
-        @Override
-        public W window() {
-          return window;
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
deleted file mode 100644
index b31afb4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-
-/**
- * {@code StateInternals} describes the functionality a runner needs to provide for the
- * State API to be supported.
- *
- * <p>The SDK will only use this after elements have been partitioned by key. For instance, after a
- * {@link GroupByKey} operation. The runner implementation must ensure that any writes using
- * {@link StateInternals} are implicitly scoped to the key being processed and the specific step
- * accessing state.
- *
- * <p>The runner implementation must also ensure that any writes to the associated state objects
- * are persisted together with the completion status of the processing that produced these
- * writes.
- *
- * <p>This is a low-level API intended for use by the Dataflow SDK. It should not be
- * used directly, and is highly likely to change.
- */
-@Experimental(Kind.STATE)
-public interface StateInternals<K> {
-
-  /** The key for this {@link StateInternals}. */
-  K getKey();
-
-  /**
-   * Return the state associated with {@code address} in the specified {@code namespace}.
-   */
-  <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address);
-
-  /**
-   * Return the state associated with {@code address} in the specified {@code namespace}
-   * with the {@link StateContext}.
-   */
-  <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
deleted file mode 100644
index 0b33ea9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Helpers for merging state.
- */
-public class StateMerging {
-  /**
-   * Clear all state in {@code address} in all windows under merge (even result windows)
-   * in {@code context}.
-   */
-  public static <K, StateT extends State, W extends BoundedWindow> void clear(
-      MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
-    for (StateT state : context.accessInEachMergingWindow(address).values()) {
-      state.clear();
-    }
-  }
-
-  /**
-   * Prefetch all bag state in {@code address} across all windows under merge in
-   * {@code context}, except for the bag state in the final state address window which we can
-   * blindly append to.
-   */
-  public static <K, T, W extends BoundedWindow> void prefetchBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
-    Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
-    if (map.isEmpty()) {
-      // Nothing to prefetch.
-      return;
-    }
-    BagState<T> result = context.access(address);
-    // Prefetch everything except what's already in result.
-    for (BagState<T> source : map.values()) {
-      if (!source.equals(result)) {
-        source.readLater();
-      }
-    }
-  }
-
-  /**
-   * Merge all bag state in {@code address} across all windows under merge.
-   */
-  public static <K, T, W extends BoundedWindow> void mergeBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
-    mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
-  }
-
-  /**
-   * Merge all bag state in {@code sources} (which may include {@code result}) into {@code result}.
-   */
-  public static <T, W extends BoundedWindow> void mergeBags(
-      Collection<BagState<T>> sources, BagState<T> result) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    // Prefetch everything except what's already in result.
-    List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size());
-    for (BagState<T> source : sources) {
-      if (!source.equals(result)) {
-        source.readLater();
-        futures.add(source);
-      }
-    }
-    if (futures.isEmpty()) {
-      // Result already holds all the values.
-      return;
-    }
-    // Transfer from sources to result.
-    for (ReadableState<Iterable<T>> future : futures) {
-      for (T element : future.read()) {
-        result.add(element);
-      }
-    }
-    // Clear sources except for result.
-    for (BagState<T> source : sources) {
-      if (!source.equals(result)) {
-        source.clear();
-      }
-    }
-  }
-
-  /**
-   * Prefetch all combining value state for {@code address} across all merging windows in {@code
-   * context}.
-   */
-  public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
-      prefetchCombiningValues(MergingStateAccessor<K, W> context,
-          StateTag<? super K, StateT> address) {
-    for (StateT state : context.accessInEachMergingWindow(address).values()) {
-      state.readLater();
-    }
-  }
-
-  /**
-   * Merge all value state in {@code address} across all merging windows in {@code context}.
-   */
-  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) {
-    mergeCombiningValues(
-        context.accessInEachMergingWindow(address).values(), context.access(address));
-  }
-
-  /**
-   * Merge all value state from {@code sources} (which may include {@code result}) into
-   * {@code result}.
-   */
-  public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
-      Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
-      AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    if (sources.size() == 1 && sources.contains(result)) {
-      // Result already holds combined value.
-      return;
-    }
-    // Prefetch.
-    List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      source.readLater();
-    }
-    // Read.
-    List<AccumT> accumulators = new ArrayList<>(futures.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      accumulators.add(source.getAccum());
-    }
-    // Merge (possibly update and return one of the existing accumulators).
-    AccumT merged = result.mergeAccumulators(accumulators);
-    // Clear sources.
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      source.clear();
-    }
-    // Update result.
-    result.addAccum(merged);
-  }
-
-  /**
-   * Prefetch all watermark state for {@code address} across all merging windows in
-   * {@code context}.
-   */
-  public static <K, W extends BoundedWindow> void prefetchWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address) {
-    Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
-    WatermarkHoldState<W> result = context.access(address);
-    if (map.isEmpty()) {
-      // Nothing to prefetch.
-      return;
-    }
-    if (map.size() == 1 && map.values().contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
-      // Nothing to change.
-      return;
-    }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
-      // No need to read existing holds.
-      return;
-    }
-    // Prefetch.
-    for (WatermarkHoldState<W> source : map.values()) {
-      source.readLater();
-    }
-  }
-
-  /**
-   * Merge all watermark state in {@code address} across all merging windows in {@code context},
-   * where the final merge result window is {@code mergeResult}.
-   */
-  public static <K, W extends BoundedWindow> void mergeWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address,
-      W mergeResult) {
-    mergeWatermarks(
-        context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
-  }
-
-  /**
-   * Merge all watermark state in {@code sources} (which must include {@code result} if non-empty)
-   * into {@code result}, where the final merge result window is {@code mergeResult}.
-   */
-  public static <W extends BoundedWindow> void mergeWatermarks(
-      Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
-      W resultWindow) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    if (sources.size() == 1 && sources.contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
-      // Nothing to merge.
-      return;
-    }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
-      // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
-        source.clear();
-      }
-      // Update directly from window-derived hold.
-      Instant hold = result.getOutputTimeFn().assignOutputTime(
-          BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
-      Preconditions.checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
-      result.add(hold);
-    } else {
-      // Prefetch.
-      List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
-      for (WatermarkHoldState<W> source : sources) {
-        futures.add(source);
-      }
-      // Read.
-      List<Instant> outputTimesToMerge = new ArrayList<>(sources.size());
-      for (ReadableState<Instant> future : futures) {
-        Instant sourceOutputTime = future.read();
-        if (sourceOutputTime != null) {
-          outputTimesToMerge.add(sourceOutputTime);
-        }
-      }
-      // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
-        source.clear();
-      }
-      if (!outputTimesToMerge.isEmpty()) {
-        // Merge and update.
-        result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
deleted file mode 100644
index f972e31..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import java.io.IOException;
-
-/**
- * A namespace used for scoping state stored with {@link StateInternals}.
- *
- * <p>Instances of {@code StateNamespace} are guaranteed to have a {@link #hashCode} and
- * {@link #equals} that uniquely identify the namespace.
- */
-public interface StateNamespace {
-
-  /**
-   * Return a {@link String} representation of the key. It is guaranteed that this
-   * {@code String} will uniquely identify the key.
-   *
-   * <p>This will encode the actual namespace as a {@code String}. It is
-   * preferable to use the {@code StateNamespace} object when possible.
-   *
-   * <p>The string produced by the standard implementations will not contain a '+' character. This
-   * enables adding a '+' between the actual namespace and other information, if needed, to separate
-   * the two.
-   */
-  String stringKey();
-
-  /**
-   * Append the string representation of this key to the {@link Appendable}.
-   */
-  void appendTo(Appendable sb) throws IOException;
-
-  /**
-   * Return an {@code Object} to use as a key in a cache.
-   *
-   * <p>Different namespaces may use the same key in order to be treated as a unit in the cache.
-   * The {@code Object}'s {@code hashCode} and {@code equals} methods will be used to determine
-   * equality.
-   */
-  Object getCacheKey();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
deleted file mode 100644
index 09b86d6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- * A simple {@link StateNamespace} used for testing.
- */
-public class StateNamespaceForTest implements StateNamespace {
-  private String key;
-
-  public StateNamespaceForTest(String key) {
-    this.key = key;
-  }
-
-  @Override
-  public String stringKey() {
-    return key;
-  }
-
-  @Override
-  public Object getCacheKey() {
-    return key;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof StateNamespaceForTest)) {
-      return false;
-    }
-
-    return Objects.equals(this.key, ((StateNamespaceForTest) obj).key);
-  }
-
-  @Override
-  public int hashCode() {
-    return key.hashCode();
-  }
-
-  @Override
-  public void appendTo(Appendable sb) throws IOException {
-    sb.append(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
deleted file mode 100644
index 8fee995..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.common.base.Splitter;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Factory methods for creating the {@link StateNamespace StateNamespaces}.
- */
-public class StateNamespaces {
-
-  private enum Namespace {
-    GLOBAL,
-    WINDOW,
-    WINDOW_AND_TRIGGER;
-  }
-
-  public static StateNamespace global() {
-    return new GlobalNamespace();
-  }
-
-  public static <W extends BoundedWindow> StateNamespace window(Coder<W> windowCoder, W window) {
-    return new WindowNamespace<>(windowCoder, window);
-  }
-
-  public static <W extends BoundedWindow>
-  StateNamespace windowAndTrigger(Coder<W> windowCoder, W window, int triggerIdx) {
-    return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx);
-  }
-
-  private StateNamespaces() {}
-
-  /**
-   * {@link StateNamespace} that is global to the current key being processed.
-   */
-  public static class GlobalNamespace implements StateNamespace {
-
-    private static final String GLOBAL_STRING = "/";
-
-    @Override
-    public String stringKey() {
-      return GLOBAL_STRING;
-    }
-
-    @Override
-    public Object getCacheKey() {
-      return GLOBAL_STRING;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return obj == this || obj instanceof GlobalNamespace;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(Namespace.GLOBAL);
-    }
-
-    @Override
-    public String toString() {
-      return "Global";
-    }
-
-    @Override
-    public void appendTo(Appendable sb) throws IOException {
-      sb.append(GLOBAL_STRING);
-    }
-  }
-
-  /**
-   * {@link StateNamespace} that is scoped to a specific window.
-   */
-  public static class WindowNamespace<W extends BoundedWindow> implements StateNamespace {
-
-    private static final String WINDOW_FORMAT = "/%s/";
-
-    private Coder<W> windowCoder;
-    private W window;
-
-    private WindowNamespace(Coder<W> windowCoder, W window) {
-      this.windowCoder = windowCoder;
-      this.window = window;
-    }
-
-    public W getWindow() {
-      return window;
-    }
-
-    @Override
-    public String stringKey() {
-      try {
-        return String.format(WINDOW_FORMAT, CoderUtils.encodeToBase64(windowCoder, window));
-      } catch (CoderException e) {
-        throw new RuntimeException("Unable to generate string key from window " + window, e);
-      }
-    }
-
-    @Override
-    public void appendTo(Appendable sb) throws IOException {
-      sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window)).append('/');
-    }
-
-    /**
-     * State in the same window will all be evicted together.
-     */
-    @Override
-    public Object getCacheKey() {
-      return window;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof WindowNamespace)) {
-        return false;
-      }
-
-      WindowNamespace<?> that = (WindowNamespace<?>) obj;
-      return Objects.equals(this.window, that.window);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(Namespace.WINDOW, window);
-    }
-
-    @Override
-    public String toString() {
-      return "Window(" + window + ")";
-    }
-  }
-
-  /**
-   * {@link StateNamespace} that is scoped to a particular window and trigger index.
-   */
-  public static class WindowAndTriggerNamespace<W extends BoundedWindow>
-      implements StateNamespace {
-
-    private static final String WINDOW_AND_TRIGGER_FORMAT = "/%s/%s/";
-
-    private static final int TRIGGER_RADIX = 36;
-    private Coder<W> windowCoder;
-    private W window;
-    private int triggerIndex;
-
-    private WindowAndTriggerNamespace(Coder<W> windowCoder, W window, int triggerIndex) {
-      this.windowCoder = windowCoder;
-      this.window = window;
-      this.triggerIndex = triggerIndex;
-    }
-
-    public W getWindow() {
-      return window;
-    }
-
-    public int getTriggerIndex() {
-      return triggerIndex;
-    }
-
-    @Override
-    public String stringKey() {
-      try {
-        return String.format(WINDOW_AND_TRIGGER_FORMAT,
-            CoderUtils.encodeToBase64(windowCoder, window),
-            // Use base 36 so that can address 36 triggers in a single byte and still be human
-            // readable.
-            Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase());
-      } catch (CoderException e) {
-        throw new RuntimeException("Unable to generate string key from window " + window, e);
-      }
-    }
-
-    @Override
-    public void appendTo(Appendable sb) throws IOException {
-      sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window));
-      sb.append('/').append(Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase());
-      sb.append('/');
-    }
-
-    /**
-     * State in the same window will all be evicted together.
-     */
-    @Override
-    public Object getCacheKey() {
-      return window;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof WindowAndTriggerNamespace)) {
-        return false;
-      }
-
-      WindowAndTriggerNamespace<?> that = (WindowAndTriggerNamespace<?>) obj;
-      return this.triggerIndex == that.triggerIndex
-          && Objects.equals(this.window, that.window);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(Namespace.WINDOW_AND_TRIGGER, window, triggerIndex);
-    }
-
-    @Override
-    public String toString() {
-      return "WindowAndTrigger(" + window + "," + triggerIndex + ")";
-    }
-  }
-
-  private static final Splitter SLASH_SPLITTER = Splitter.on('/');
-
-  /**
-   * Convert a {@code stringKey} produced using {@link StateNamespace#stringKey}
-   * on one of the namespaces produced by this class into the original
-   * {@link StateNamespace}.
-   */
-  public static <W extends BoundedWindow> StateNamespace fromString(
-      String stringKey, Coder<W> windowCoder) {
-    if (!stringKey.startsWith("/") || !stringKey.endsWith("/")) {
-      throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
-    }
-
-    if (GlobalNamespace.GLOBAL_STRING.equals(stringKey)) {
-      return global();
-    }
-
-    List<String> parts = SLASH_SPLITTER.splitToList(stringKey);
-    if (parts.size() != 3 && parts.size() != 4) {
-      throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
-    }
-    // Ends should be empty (we start and end with /)
-    if (!parts.get(0).isEmpty() || !parts.get(parts.size() - 1).isEmpty()) {
-      throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
-    }
-
-    try {
-      W window = CoderUtils.decodeFromBase64(windowCoder, parts.get(1));
-      if (parts.size() > 3) {
-        int index = Integer.parseInt(parts.get(2), WindowAndTriggerNamespace.TRIGGER_RADIX);
-        return windowAndTrigger(windowCoder, window, index);
-      } else {
-        return window(windowCoder, window);
-      }
-    } catch (Exception  e) {
-      throw new RuntimeException("Invalid namespace string: '" + stringKey + "'", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
deleted file mode 100644
index edd1dae..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Table;
-import com.google.common.collect.Tables;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance.
- */
-public abstract class StateTable<K> {
-
-  private final Table<StateNamespace, StateTag<? super K, ?>, State> stateTable =
-      Tables.newCustomTable(new HashMap<StateNamespace, Map<StateTag<? super K, ?>, State>>(),
-          new Supplier<Map<StateTag<? super K, ?>, State>>() {
-        @Override
-        public Map<StateTag<? super K, ?>, State> get() {
-          return new HashMap<>();
-        }
-      });
-
-  /**
-   * Gets the {@link State} in the specified {@link StateNamespace} with the specified {@link
-   * StateTag}, binding it using the {@link #binderForNamespace} if it is not
-   * already present in this {@link StateTable}.
-   */
-  public <StateT extends State> StateT get(
-      StateNamespace namespace, StateTag<? super K, StateT> tag, StateContext<?> c) {
-    State storage = stateTable.get(namespace, tag);
-    if (storage != null) {
-      @SuppressWarnings("unchecked")
-      StateT typedStorage = (StateT) storage;
-      return typedStorage;
-    }
-
-    StateT typedStorage = tag.bind(binderForNamespace(namespace, c));
-    stateTable.put(namespace, tag, typedStorage);
-    return typedStorage;
-  }
-
-  public void clearNamespace(StateNamespace namespace) {
-    stateTable.rowKeySet().remove(namespace);
-  }
-
-  public void clear() {
-    stateTable.clear();
-  }
-
-  public Iterable<State> values() {
-    return stateTable.values();
-  }
-
-  public boolean isNamespaceInUse(StateNamespace namespace) {
-    return stateTable.containsRow(namespace);
-  }
-
-  public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace namespace) {
-    return stateTable.row(namespace);
-  }
-
-  public Set<StateNamespace> getNamespacesInUse() {
-    return stateTable.rowKeySet();
-  }
-
-  /**
-   * Provide the {@code StateBinder} to use for creating {@code Storage} instances
-   * in the specified {@code namespace}.
-   */
-  protected abstract StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
deleted file mode 100644
index c87bdb7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An address for persistent state. This includes a unique identifier for the location, the
- * information necessary to encode the value, and details about the intended access pattern.
- *
- * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column
- * that has cells of type {@code StateT}.
- *
- * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}.
- *
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- *            accept values of type {@code KeyedStateTag<? super K, StateT>}.
- * @param <StateT> The type of state being tagged.
- */
-@Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
-
-  /**
-   * Visitor for binding a {@link StateTag} and to the associated {@link State}.
-   *
-   * @param <K> the type of key this binder embodies.
-   */
-  public interface StateBinder<K> {
-    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder);
-
-    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder);
-
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn);
-
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindKeyedCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindKeyedCombiningValueWithContext(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn);
-
-    /**
-     * Bind to a watermark {@link StateTag}.
-     *
-     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps
-     * added to the returned {@link WatermarkHoldState} are to be combined.
-     */
-    <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn);
-  }
-
-  /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
-  void appendTo(Appendable sb) throws IOException;
-
-  /**
-   * Returns the user-provided name of this state cell.
-   */
-  String getId();
-
-  /**
-   * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
-   */
-  StateT bind(StateBinder<? extends K> binder);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
deleted file mode 100644
index ec9a78f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.common.base.MoreObjects;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Static utility methods for creating {@link StateTag} instances.
- */
-@Experimental(Kind.STATE)
-public class StateTags {
-
-  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
-
-  static {
-    STANDARD_REGISTRY.registerStandardCoders();
-  }
-
-  private enum StateKind {
-    SYSTEM('s'),
-    USER('u');
-
-    private char prefix;
-
-    StateKind(char prefix) {
-      this.prefix = prefix;
-    }
-  }
-
-  private StateTags() { }
-
-  private interface SystemStateTag<K, StateT extends State> {
-    StateTag<K, StateT> asKind(StateKind kind);
-  }
-
-  /**
-   * Create a simple state tag for values of type {@code T}.
-   */
-  public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
-    return new ValueStateTag<>(new StructuredId(id), valueCoder);
-  }
-
-  /**
-   * Create a state tag for values that use a {@link CombineFn} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-    StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    combiningValue(
-      String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return combiningValueInternal(id, accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the
-   * {@link KeyedCombineFn} comes from the keyed {@link StateAccessor}.
-   */
-  public static <K, InputT, AccumT,
-      OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValue(String id, Coder<AccumT> accumCoder,
-          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return keyedCombiningValueInternal(id, accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically
-   * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the
-   * {@link KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes
-   * from the {@link StateContext}.
-   */
-  public static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValueWithContext(
-          String id,
-          Coder<AccumT> accumCoder,
-          KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>(
-        new StructuredId(id),
-        accumCoder,
-        combineFn);
-  }
-
-  /**
-   * Create a state tag for values that use a {@link CombineFn} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   *
-   * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and
-   * should only be used to initialize static values.
-   */
-  public static <InputT, AccumT, OutputT>
-      StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      combiningValueFromInputInternal(
-          String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    try {
-      Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
-      return combiningValueInternal(id, accumCoder, combineFn);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName()
-          + " from " + inputCoder, e);
-    }
-  }
-
-  private static <InputT, AccumT,
-      OutputT> StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      combiningValueInternal(
-      String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return
-        new CombiningValueStateTag<InputT, AccumT, OutputT>(
-            new StructuredId(id), accumCoder, combineFn);
-  }
-
-  private static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
-          String id,
-          Coder<AccumT> accumCoder,
-          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>(
-        new StructuredId(id), accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state tag that is optimized for adding values frequently, and
-   * occasionally retrieving all the values that have been added.
-   */
-  public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
-    return new BagStateTag<T>(new StructuredId(id), elemCoder);
-  }
-
-  /**
-   * Create a state tag for holding the watermark.
-   */
-  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
-      watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
-    return new WatermarkStateTagInternal<W>(new StructuredId(id), outputTimeFn);
-  }
-
-  /**
-   * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to
-   * collide with any user tags.
-   */
-  public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
-      StateTag<K, StateT> tag) {
-    if (!(tag instanceof SystemStateTag)) {
-      throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag);
-    }
-    // Checked above
-    @SuppressWarnings("unchecked")
-    SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
-    return typedTag.asKind(StateKind.SYSTEM);
-  }
-
-  public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
-      convertToBagTagInternal(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
-    if (combiningTag instanceof KeyedCombiningValueStateTag) {
-      // Checked above; conversion to a bag tag depends on the provided tag being one of those
-      // created via the factory methods in this class.
-      @SuppressWarnings("unchecked")
-      KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> typedTag =
-          (KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>) combiningTag;
-      return typedTag.asBagTag();
-    } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) {
-      @SuppressWarnings("unchecked")
-      KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> typedTag =
-          (KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>) combiningTag;
-      return typedTag.asBagTag();
-    } else {
-      throw new IllegalArgumentException("Unexpected StateTag " + combiningTag);
-    }
-  }
-
-  private static class StructuredId implements Serializable {
-    private final StateKind kind;
-    private final String rawId;
-
-    private StructuredId(String rawId) {
-      this(StateKind.USER, rawId);
-    }
-
-    private StructuredId(StateKind kind, String rawId) {
-      this.kind = kind;
-      this.rawId = rawId;
-    }
-
-    public StructuredId asKind(StateKind kind) {
-      return new StructuredId(kind, rawId);
-    }
-
-    public void appendTo(Appendable sb) throws IOException {
-      sb.append(kind.prefix).append(rawId);
-    }
-
-    public String getRawId() {
-      return rawId;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("id", rawId)
-          .add("kind", kind)
-          .toString();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof StructuredId)) {
-        return false;
-      }
-
-      StructuredId that = (StructuredId) obj;
-      return Objects.equals(this.kind, that.kind)
-          && Objects.equals(this.rawId, that.rawId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(kind, rawId);
-    }
-  }
-
-  /**
-   * A base class that just manages the structured ids.
-   */
-  private abstract static class StateTagBase<K, StateT extends State>
-      implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
-
-    protected final StructuredId id;
-
-    protected StateTagBase(StructuredId id) {
-      this.id = id;
-    }
-
-    @Override
-    public String getId() {
-      return id.getRawId();
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("id", id)
-          .toString();
-    }
-
-    @Override
-    public void appendTo(Appendable sb) throws IOException {
-      id.appendTo(sb);
-    }
-
-    @Override
-    public abstract StateTag<K, StateT> asKind(StateKind kind);
-  }
-
-  /**
-   * A value state cell for values of type {@code T}.
-   *
-   * @param <T> the type of value being stored
-   */
-  private static class ValueStateTag<T> extends StateTagBase<Object, ValueState<T>>
-      implements StateTag<Object, ValueState<T>> {
-
-    private final Coder<T> coder;
-
-    private ValueStateTag(StructuredId id, Coder<T> coder) {
-      super(id);
-      this.coder = coder;
-    }
-
-    @Override
-    public ValueState<T> bind(StateBinder<? extends Object> visitor) {
-      return visitor.bindValue(this, coder);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof ValueStateTag)) {
-        return false;
-      }
-
-      ValueStateTag<?> that = (ValueStateTag<?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.coder, that.coder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, coder);
-    }
-
-    @Override
-    public StateTag<Object, ValueState<T>> asKind(StateKind kind) {
-      return new ValueStateTag<T>(id.asKind(kind), coder);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link CombineFn}.
-   *
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class CombiningValueStateTag<InputT, AccumT, OutputT>
-      extends KeyedCombiningValueStateTag<Object, InputT, AccumT, OutputT>
-      implements StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>,
-      SystemStateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final CombineFn<InputT, AccumT, OutputT> combineFn;
-
-    private CombiningValueStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-      super(id, accumCoder, combineFn.asKeyedFn());
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    asKind(StateKind kind) {
-      return new CombiningValueStateTag<InputT, AccumT, OutputT>(
-          id.asKind(kind), accumCoder, combineFn);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}.
-   *
-   * @param <K> the type of keys
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>
-    extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
-    protected KeyedCombiningValueWithContextStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-      super(id);
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
-        StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) {
-        return false;
-      }
-
-      KeyedCombiningValueWithContextStateTag<?, ?, ?, ?> that =
-          (KeyedCombiningValueWithContextStateTag<?, ?, ?, ?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.accumCoder, that.accumCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, accumCoder);
-    }
-
-    @Override
-    public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
-        StateKind kind) {
-      return new KeyedCombiningValueWithContextStateTag<>(
-          id.asKind(kind), accumCoder, combineFn);
-    }
-
-    private StateTag<Object, BagState<AccumT>> asBagTag() {
-      return new BagStateTag<AccumT>(id, accumCoder);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link KeyedCombineFn}.
-   *
-   * @param <K> the type of keys
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>
-      extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    protected KeyedCombiningValueStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      super(id);
-      this.keyedCombineFn = keyedCombineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
-        StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof CombiningValueStateTag)) {
-        return false;
-      }
-
-      KeyedCombiningValueStateTag<?, ?, ?, ?> that = (KeyedCombiningValueStateTag<?, ?, ?, ?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.accumCoder, that.accumCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, accumCoder);
-    }
-
-    @Override
-    public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
-        StateKind kind) {
-      return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn);
-    }
-
-    private StateTag<Object, BagState<AccumT>> asBagTag() {
-      return new BagStateTag<AccumT>(id, accumCoder);
-    }
-  }
-
-  /**
-   * A state cell optimized for bag-like access patterns (frequent additions, occasional reads
-   * of all the values).
-   *
-   * @param <T> the type of value in the bag
-   */
-  private static class BagStateTag<T> extends StateTagBase<Object, BagState<T>>
-      implements StateTag<Object, BagState<T>>{
-
-    private final Coder<T> elemCoder;
-
-    private BagStateTag(StructuredId id, Coder<T> elemCoder) {
-      super(id);
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public BagState<T> bind(StateBinder<? extends Object> visitor) {
-      return visitor.bindBag(this, elemCoder);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof BagStateTag)) {
-        return false;
-      }
-
-      BagStateTag<?> that = (BagStateTag<?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.elemCoder, that.elemCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, elemCoder);
-    }
-
-    @Override
-    public StateTag<Object, BagState<T>> asKind(StateKind kind) {
-      return new BagStateTag<>(id.asKind(kind), elemCoder);
-    }
-  }
-
-  private static class WatermarkStateTagInternal<W extends BoundedWindow>
-      extends StateTagBase<Object, WatermarkHoldState<W>> {
-
-    /**
-     * When multiple output times are added to hold the watermark, this determines how they are
-     * combined, and also the behavior when merging windows. Does not contribute to equality/hash
-     * since we have at most one watermark hold tag per computation.
-     */
-    private final OutputTimeFn<? super W> outputTimeFn;
-
-    private WatermarkStateTagInternal(StructuredId id, OutputTimeFn<? super W> outputTimeFn) {
-      super(id);
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public WatermarkHoldState<W> bind(StateBinder<? extends Object> visitor) {
-      return visitor.bindWatermark(this, outputTimeFn);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof WatermarkStateTagInternal)) {
-        return false;
-      }
-
-      WatermarkStateTagInternal<?> that = (WatermarkStateTagInternal<?>) obj;
-      return Objects.equals(this.id, that.id);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id);
-    }
-
-    @Override
-    public StateTag<Object, WatermarkHoldState<W>> asKind(StateKind kind) {
-      return new WatermarkStateTagInternal<W>(id.asKind(kind), outputTimeFn);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
deleted file mode 100644
index 19c12bb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * State holding a single value.
- *
- * @param <T> The type of values being stored.
- */
-@Experimental(Kind.STATE)
-public interface ValueState<T> extends ReadableState<T>, State {
-  /**
-   * Set the value of the buffer.
-   */
-  void write(T input);
-
-  @Override
-  ValueState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
deleted file mode 100644
index 8a1adc9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-
-import org.joda.time.Instant;
-
-/**
- * A {@link State} accepting and aggregating output timestamps, which determines
- * the time to which the output watermark must be held.
- *
- * <p><b><i>For internal use only. This API may change at any time.</i></b>
- */
-@Experimental(Kind.STATE)
-public interface WatermarkHoldState<W extends BoundedWindow>
-    extends CombiningState<Instant, Instant> {
-  /**
-   * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
-   * an element timestamp, and to combine watermarks from windows which are about to be merged.
-   */
-  OutputTimeFn<? super W> getOutputTimeFn();
-
-  @Override
-  WatermarkHoldState<W> readLater();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
deleted file mode 100644
index 23cee07..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-import com.google.common.base.MoreObjects;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * An immutable key/value pair.
- *
- * <p>Various {@link PTransform PTransforms} like {@link GroupByKey} and {@link Combine#perKey}
- * operate on {@link PCollection PCollections} of {@link KV KVs}.
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class KV<K, V> implements Serializable {
-  /** Returns a {@link KV} with the given key and value. */
-  public static <K, V> KV<K, V> of(K key, V value) {
-    return new KV<>(key, value);
-  }
-
-  /** Returns the key of this {@link KV}. */
-  public K getKey() {
-    return key;
-  }
-
-  /** Returns the value of this {@link KV}. */
-  public V getValue() {
-    return value;
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  final K key;
-  final V value;
-
-  private KV(K key, V value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof KV)) {
-      return false;
-    }
-    KV<?, ?> otherKv = (KV<?, ?>) other;
-    // Arrays are very common as values and keys, so deepEquals is mandatory
-    return Objects.deepEquals(this.key, otherKv.key)
-        && Objects.deepEquals(this.value, otherKv.value);
-  }
-
-  /**
-   * A {@link Comparator} that orders {@link KV KVs} by the natural ordering of their keys.
-   *
-   * <p>A {@code null} key is less than any non-{@code null} key.
-   */
-  public static class OrderByKey<K extends Comparable<? super K>, V> implements
-      SerializableComparator<KV<K, V>> {
-    @Override
-    public int compare(KV<K, V> a, KV<K, V> b) {
-      if (a.key == null) {
-        return b.key == null ? 0 : -1;
-      } else if (b.key == null) {
-        return 1;
-      } else {
-        return a.key.compareTo(b.key);
-      }
-    }
-  }
-
-  /**
-   * A {@link Comparator} that orders {@link KV KVs} by the natural ordering of their values.
-   *
-   * <p>A {@code null} value is less than any non-{@code null} value.
-   */
-  public static class OrderByValue<K, V extends Comparable<? super V>>
-      implements SerializableComparator<KV<K, V>> {
-    @Override
-    public int compare(KV<K, V> a, KV<K, V> b) {
-      if (a.value == null) {
-        return b.value == null ? 0 : -1;
-      } else if (b.value == null) {
-        return 1;
-      } else {
-        return a.value.compareTo(b.value);
-      }
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    // Objects.deepEquals requires Arrays.deepHashCode for correctness
-    return Arrays.deepHashCode(new Object[]{key, value});
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .addValue(key)
-        .addValue(value)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
deleted file mode 100644
index 23ac3ae..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO.Read;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * {@link PBegin} is the "input" to a root {@link PTransform}, such as {@link Read Read} or
- * {@link Create}.
- *
- * <p>Typically created by calling {@link Pipeline#begin} on a Pipeline.
- */
-public class PBegin implements PInput {
-  /**
-   * Returns a {@link PBegin} in the given {@link Pipeline}.
-   */
-  public static PBegin in(Pipeline pipeline) {
-    return new PBegin(pipeline);
-  }
-
-  /**
-   * Like {@link #apply(String, PTransform)} but defaulting to the name
-   * of the {@link PTransform}.
-   */
-  public <OutputT extends POutput> OutputT apply(
-      PTransform<? super PBegin, OutputT> t) {
-    return Pipeline.applyTransform(this, t);
-  }
-
-  /**
-   * Applies the given {@link PTransform} to this input {@link PBegin},
-   * using {@code name} to identify this specific application of the transform.
-   * This name is used in various places, including the monitoring UI, logging,
-   * and to stably identify this application node in the job graph.
-   */
-  public <OutputT extends POutput> OutputT apply(
-      String name, PTransform<? super PBegin, OutputT> t) {
-    return Pipeline.applyTransform(name, this, t);
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public Collection<? extends PValue> expand() {
-    // A PBegin contains no PValues.
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void finishSpecifying() {
-    // Nothing more to be done.
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Constructs a {@link PBegin} in the given {@link Pipeline}.
-   */
-  protected PBegin(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  private final Pipeline pipeline;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
deleted file mode 100644
index 6fffddf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-
-/**
- * A {@link PCollection PCollection&lt;T&gt;} is an immutable collection of values of type
- * {@code T}.  A {@link PCollection} can contain either a bounded or unbounded
- * number of elements.  Bounded and unbounded {@link PCollection PCollections} are produced
- * as the output of {@link PTransform PTransforms}
- * (including root PTransforms like {@link Read} and {@link Create}), and can
- * be passed as the inputs of other PTransforms.
- *
- * <p>Some root transforms produce bounded {@code PCollections} and others
- * produce unbounded ones.  For example, {@link TextIO.Read} reads a static set
- * of files, so it produces a bounded {@link PCollection}.
- * {@link PubsubIO.Read}, on the other hand, receives a potentially infinite stream
- * of Pubsub messages, so it produces an unbounded {@link PCollection}.
- *
- * <p>Each element in a {@link PCollection} may have an associated implicit
- * timestamp.  Readers assign timestamps to elements when they create
- * {@link PCollection PCollections}, and other {@link PTransform PTransforms} propagate these
- * timestamps from their input to their output. For example, {@link PubsubIO.Read}
- * assigns pubsub message timestamps to elements, and {@link TextIO.Read} assigns
- * the default value {@link BoundedWindow#TIMESTAMP_MIN_VALUE} to elements. User code can
- * explicitly assign timestamps to elements with
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#outputWithTimestamp}.
- *
- * <p>Additionally, a {@link PCollection} has an associated
- * {@link WindowFn} and each element is assigned to a set of windows.
- * By default, the windowing function is {@link GlobalWindows}
- * and all elements are assigned into a single default window.
- * This default can be overridden with the {@link Window}
- * {@link PTransform}.
- *
- * <p>See the individual {@link PTransform} subclasses for specific information
- * on how they propagate timestamps and windowing.
- *
- * @param <T> the type of the elements of this {@link PCollection}
- */
-public class PCollection<T> extends TypedPValue<T> {
-
-  /**
-   * The enumeration of cases for whether a {@link PCollection} is bounded.
-   */
-  public enum IsBounded {
-    /**
-     * Indicates that a {@link PCollection} contains bounded data elements, such as
-     * {@link PCollection PCollections} from {@link TextIO}, {@link BigQueryIO},
-     * {@link Create} e.t.c.
-     */
-    BOUNDED,
-    /**
-     * Indicates that a {@link PCollection} contains unbounded data elements, such as
-     * {@link PCollection PCollections} from {@link PubsubIO}.
-     */
-    UNBOUNDED;
-
-    /**
-     * Returns the composed IsBounded property.
-     *
-     * <p>The composed property is {@link #BOUNDED} only if all components are {@link #BOUNDED}.
-     * Otherwise, it is {@link #UNBOUNDED}.
-     */
-    public IsBounded and(IsBounded that) {
-      if (this == BOUNDED && that == BOUNDED) {
-        return BOUNDED;
-      } else {
-        return UNBOUNDED;
-      }
-    }
-  }
-
-  /**
-   * Returns the name of this {@link PCollection}.
-   *
-   * <p>By default, the name of a {@link PCollection} is based on the name of the
-   * {@link PTransform} that produces it.  It can be specified explicitly by
-   * calling {@link #setName}.
-   *
-   * @throws IllegalStateException if the name hasn't been set yet
-   */
-  @Override
-  public String getName() {
-    return super.getName();
-  }
-
-  /**
-   * Sets the name of this {@link PCollection}.  Returns {@code this}.
-   *
-   * @throws IllegalStateException if this {@link PCollection} has already been
-   * finalized and may no longer be set.
-   * Once {@link #apply} has been called, this will be the case.
-   */
-  @Override
-  public PCollection<T> setName(String name) {
-    super.setName(name);
-    return this;
-  }
-
-  /**
-   * Returns the {@link Coder} used by this {@link PCollection} to encode and decode
-   * the values stored in it.
-   *
-   * @throws IllegalStateException if the {@link Coder} hasn't been set, and
-   * couldn't be inferred.
-   */
-  @Override
-  public Coder<T> getCoder() {
-    return super.getCoder();
-  }
-
-  /**
-   * Sets the {@link Coder} used by this {@link PCollection} to encode and decode the
-   * values stored in it. Returns {@code this}.
-   *
-   * @throws IllegalStateException if this {@link PCollection} has already
-   * been finalized and may no longer be set.
-   * Once {@link #apply} has been called, this will be the case.
-   */
-  @Override
-  public PCollection<T> setCoder(Coder<T> coder) {
-    super.setCoder(coder);
-    return this;
-  }
-
-  /**
-   * Like {@link IsBounded#apply(String, PTransform)} but defaulting to the name
-   * of the {@link PTransform}.
-   *
-   * @return the output of the applied {@link PTransform}
-   */
-  public <OutputT extends POutput> OutputT apply(PTransform<? super PCollection<T>, OutputT> t) {
-    return Pipeline.applyTransform(this, t);
-  }
-
-  /**
-   * Applies the given {@link PTransform} to this input {@link PCollection},
-   * using {@code name} to identify this specific application of the transform.
-   * This name is used in various places, including the monitoring UI, logging,
-   * and to stably identify this application node in the job graph.
-   *
-   * @return the output of the applied {@link PTransform}
-   */
-  public <OutputT extends POutput> OutputT apply(
-      String name, PTransform<? super PCollection<T>, OutputT> t) {
-    return Pipeline.applyTransform(name, this, t);
-  }
-
-  /**
-   * Returns the {@link WindowingStrategy} of this {@link PCollection}.
-   */
-  public WindowingStrategy<?, ?> getWindowingStrategy() {
-    return windowingStrategy;
-  }
-
-  public IsBounded isBounded() {
-    return isBounded;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal details below here.
-
-  /**
-   * {@link WindowingStrategy} that will be used for merging windows and triggering output in this
-   * {@link PCollection} and subsequence {@link PCollection PCollections} produced from this one.
-   *
-   * <p>By default, no merging is performed.
-   */
-  private WindowingStrategy<?, ?> windowingStrategy;
-
-  private IsBounded isBounded;
-
-  private PCollection(Pipeline p) {
-    super(p);
-  }
-
-  /**
-   * Sets the {@link TypeDescriptor TypeDescriptor&lt;T&gt;} for this
-   * {@link PCollection PCollection&lt;T&gt;}. This may allow the enclosing
-   * {@link PCollectionTuple}, {@link PCollectionList}, or {@code PTransform<?, PCollection<T>>},
-   * etc., to provide more detailed reflective information.
-   */
-  @Override
-  public PCollection<T> setTypeDescriptorInternal(TypeDescriptor<T> typeDescriptor) {
-    super.setTypeDescriptorInternal(typeDescriptor);
-    return this;
-  }
-
-  /**
-   * Sets the {@link WindowingStrategy} of this {@link PCollection}.
-   *
-   * <p>For use by primitive transformations only.
-   */
-  public PCollection<T> setWindowingStrategyInternal(WindowingStrategy<?, ?> windowingStrategy) {
-     this.windowingStrategy = windowingStrategy;
-     return this;
-  }
-
-  /**
-   * Sets the {@link PCollection.IsBounded} of this {@link PCollection}.
-   *
-   * <p>For use by internal transformations only.
-   */
-  public PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
-    this.isBounded = isBounded;
-    return this;
-  }
-
-  /**
-   * Creates and returns a new {@link PCollection} for a primitive output.
-   *
-   * <p>For use by primitive transformations only.
-   */
-  public static <T> PCollection<T> createPrimitiveOutputInternal(
-      Pipeline pipeline,
-      WindowingStrategy<?, ?> windowingStrategy,
-      IsBounded isBounded) {
-    return new PCollection<T>(pipeline)
-        .setWindowingStrategyInternal(windowingStrategy)
-        .setIsBoundedInternal(isBounded);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
deleted file mode 100644
index b99af02..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Partition;
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A {@link PCollectionList PCollectionList&lt;T&gt;} is an immutable list of homogeneously
- * typed {@link PCollection PCollection&lt;T&gt;s}. A {@link PCollectionList} is used, for
- * instance, as the input to
- * {@link Flatten} or the output of {@link Partition}.
- *
- * <p>PCollectionLists can be created and accessed like follows:
- * <pre> {@code
- * PCollection<String> pc1 = ...;
- * PCollection<String> pc2 = ...;
- * PCollection<String> pc3 = ...;
- *
- * // Create a PCollectionList with three PCollections:
- * PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
- *
- * // Create an empty PCollectionList:
- * Pipeline p = ...;
- * PCollectionList<String> pcs2 = PCollectionList.<String>empty(p);
- *
- * // Get PCollections out of a PCollectionList, by index (origin 0):
- * PCollection<String> pcX = pcs.get(1);
- * PCollection<String> pcY = pcs.get(0);
- * PCollection<String> pcZ = pcs.get(2);
- *
- * // Get a list of all PCollections in a PCollectionList:
- * List<PCollection<String>> allPcs = pcs.getAll();
- * } </pre>
- *
- * @param <T> the type of the elements of all the {@link PCollection PCollections} in this list
- */
-public class PCollectionList<T> implements PInput, POutput {
-  /**
-   * Returns an empty {@link PCollectionList} that is part of the given {@link Pipeline}.
-   *
-   * <p>Longer {@link PCollectionList PCollectionLists} can be created by calling
-   * {@link #and} on the result.
-   */
-  public static <T> PCollectionList<T> empty(Pipeline pipeline) {
-    return new PCollectionList<>(pipeline);
-  }
-
-  /**
-   * Returns a singleton {@link PCollectionList} containing the given {@link PCollection}.
-   *
-   * <p>Longer {@link PCollectionList PCollectionLists} can be created by calling
-   * {@link #and} on the result.
-   */
-  public static <T> PCollectionList<T> of(PCollection<T> pc) {
-    return new PCollectionList<T>(pc.getPipeline()).and(pc);
-  }
-
-  /**
-   * Returns a {@link PCollectionList} containing the given {@link PCollection PCollections},
-   * in order.
-   *
-   * <p>The argument list cannot be empty.
-   *
-   * <p>All the {@link PCollection PCollections} in the resulting {@link PCollectionList} must be
-   * part of the same {@link Pipeline}.
-   *
-   * <p>Longer PCollectionLists can be created by calling
-   * {@link #and} on the result.
-   */
-  public static <T> PCollectionList<T> of(Iterable<PCollection<T>> pcs) {
-    Iterator<PCollection<T>> pcsIter = pcs.iterator();
-    if (!pcsIter.hasNext()) {
-      throw new IllegalArgumentException(
-          "must either have a non-empty list of PCollections, " +
-          "or must first call empty(Pipeline)");
-    }
-    return new PCollectionList<T>(pcsIter.next().getPipeline()).and(pcs);
-  }
-
-  /**
-   * Returns a new {@link PCollectionList} that has all the {@link PCollection PCollections} of
-   * this {@link PCollectionList} plus the given {@link PCollection} appended to the end.
-   *
-   * <p>All the {@link PCollection PCollections} in the resulting {@link PCollectionList} must be
-   * part of the same {@link Pipeline}.
-   */
-  public PCollectionList<T> and(PCollection<T> pc) {
-    if (pc.getPipeline() != pipeline) {
-      throw new IllegalArgumentException(
-          "PCollections come from different Pipelines");
-    }
-    return new PCollectionList<>(pipeline,
-        new ImmutableList.Builder<PCollection<T>>()
-            .addAll(pcollections)
-            .add(pc)
-            .build());
-  }
-
-  /**
-   * Returns a new {@link PCollectionList} that has all the {@link PCollection PCollections} of
-   * this {@link PCollectionList} plus the given {@link PCollection PCollections} appended to the
-   * end, in order.
-   *
-   * <p>All the {@link PCollections} in the resulting {@link PCollectionList} must be
-   * part of the same {@link Pipeline}.
-   */
-  public PCollectionList<T> and(Iterable<PCollection<T>> pcs) {
-    List<PCollection<T>> copy = new ArrayList<>(pcollections);
-    for (PCollection<T> pc : pcs) {
-      if (pc.getPipeline() != pipeline) {
-        throw new IllegalArgumentException(
-            "PCollections come from different Pipelines");
-      }
-      copy.add(pc);
-    }
-    return new PCollectionList<>(pipeline, copy);
-  }
-
-  /**
-   * Returns the number of {@link PCollection PCollections} in this {@link PCollectionList}.
-   */
-  public int size() {
-    return pcollections.size();
-  }
-
-  /**
-   * Returns the {@link PCollection} at the given index (origin zero).
-   *
-   * @throws IndexOutOfBoundsException if the index is out of the range
-   * {@code [0..size()-1]}.
-   */
-  public PCollection<T> get(int index) {
-    return pcollections.get(index);
-  }
-
-  /**
-   * Returns an immutable List of all the {@link PCollection PCollections} in this
-   * {@link PCollectionList}.
-   */
-  public List<PCollection<T>> getAll() {
-    return pcollections;
-  }
-
-  /**
-   * Like {@link #apply(String, PTransform)} but defaulting to the name
-   * of the {@code PTransform}.
-   */
-  public <OutputT extends POutput> OutputT apply(
-      PTransform<PCollectionList<T>, OutputT> t) {
-    return Pipeline.applyTransform(this, t);
-  }
-
-  /**
-   * Applies the given {@link PTransform} to this input {@link PCollectionList},
-   * using {@code name} to identify this specific application of the transform.
-   * This name is used in various places, including the monitoring UI, logging,
-   * and to stably identify this application node in the job graph.
-   *
-   * @return the output of the applied {@link PTransform}
-   */
-  public <OutputT extends POutput> OutputT apply(
-      String name, PTransform<PCollectionList<T>, OutputT> t) {
-    return Pipeline.applyTransform(name, this, t);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal details below here.
-
-  final Pipeline pipeline;
-  final List<PCollection<T>> pcollections;
-
-  PCollectionList(Pipeline pipeline) {
-    this(pipeline, new ArrayList<PCollection<T>>());
-  }
-
-  PCollectionList(Pipeline pipeline, List<PCollection<T>> pcollections) {
-    this.pipeline = pipeline;
-    this.pcollections = Collections.unmodifiableList(pcollections);
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public Collection<? extends PValue> expand() {
-    return pcollections;
-  }
-
-  @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
-    int i = 0;
-    for (PCollection<T> pc : pcollections) {
-      pc.recordAsOutput(transform, "out" + i);
-      i++;
-    }
-  }
-
-  @Override
-  public void finishSpecifying() {
-    for (PCollection<T> pc : pcollections) {
-      pc.finishSpecifying();
-    }
-  }
-
-  @Override
-  public void finishSpecifyingOutput() {
-    for (PCollection<T> pc : pcollections) {
-      pc.finishSpecifyingOutput();
-    }
-  }
-}