You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:09 UTC

[03/19] beam git commit: Move Java sdk.util.state to sdk.state

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 28bbc3c..1db0e86 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -373,7 +373,7 @@
     <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
   </Match>
   <Match>
-    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
+    <Class name="StateSpecs$CombiningStateSpec"/>
     <Method name="equals"/>
     <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
     <!--[BEAM-421] Class doesn't override equals in superclass-->

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
new file mode 100644
index 0000000..189d151
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * State containing a bag values. Items can be added to the bag and the contents read out.
+ *
+ * @param <T> The type of elements in the bag.
+ */
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
+  @Override
+  BagState<T> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
new file mode 100644
index 0000000..6080127
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+    extends GroupingState<InputT, OutputT> {
+
+  /**
+   * Read the merged accumulator for this combining value. It is implied that reading the
+   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+   * this.
+   */
+  AccumT getAccum();
+
+  /**
+   * Add an accumulator to this combining value. Depending on implementation this may immediately
+   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+   */
+  void addAccum(AccumT accum);
+
+  /**
+   * Merge the given accumulators according to the underlying combiner.
+   */
+  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+  @Override
+  CombiningState<InputT, AccumT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
new file mode 100644
index 0000000..3a12e79
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+  /**
+   * Add a value to the buffer.
+   */
+  void add(InputT value);
+
+  /**
+   * Return true if this state is empty.
+   */
+  ReadableState<Boolean> isEmpty();
+
+  @Override
+  GroupingState<InputT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
new file mode 100644
index 0000000..9f0eee9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import java.util.Map;
+
+/**
+ * An object that maps keys to values.
+ * A map cannot contain duplicate keys;
+ * each key can map to at most one value.
+ *
+ * @param <K> the type of keys maintained by this map
+ * @param <V> the type of mapped values
+ */
+public interface MapState<K, V> extends State {
+
+  /**
+   * Associates the specified value with the specified key in this state.
+   */
+  void put(K key, V value);
+
+  /**
+   * A deferred read-followed-by-write.
+   *
+   * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
+   * map and reconciliation with any pending modifications.
+   *
+   * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
+   * associates it with the given value and returns {@code null}, else returns the current value.
+   */
+  ReadableState<V> putIfAbsent(K key, V value);
+
+  /**
+   * Removes the mapping for a key from this map if it is present.
+   */
+  void remove(K key);
+
+  /**
+   * A deferred lookup.
+   *
+   * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
+   * on the results.
+   *
+   * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
+   * all pending reads in a single batch.
+   */
+  ReadableState<V> get(K key);
+
+  /**
+   * Returns a iterable view of the keys contained in this map.
+   */
+  ReadableState<Iterable<K>> keys();
+
+  /**
+   * Returns a iterable view of the values contained in this map.
+   */
+  ReadableState<Iterable<V>> values();
+
+  /**
+   * Returns a iterable view of all key-values.
+   */
+  ReadableState<Iterable<Map.Entry<K, V>>> entries();
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
new file mode 100644
index 0000000..b29ab26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * {@link State} that can be read via {@link #read()}.
+ *
+ * <p>Use {@link #readLater()} for marking several states for prefetching. Runners
+ * can potentially batch these into one read.
+ *
+ * @param <T> The type of value returned by {@link #read}.
+ */
+@Experimental(Kind.STATE)
+public interface ReadableState<T> {
+  /**
+   * Read the current value, blocking until it is available.
+   *
+   * <p>If there will be many calls to {@link #read} for different state in short succession,
+   * you should first call {@link #readLater} for all of them so the reads can potentially be
+   * batched (depending on the underlying implementation}.
+   */
+  T read();
+
+  /**
+   * Indicate that the value will be read later.
+   *
+   * <p>This allows an implementation to start an asynchronous prefetch or
+   * to include this state in the next batch of reads.
+   *
+   * @return this for convenient chaining
+   */
+  ReadableState<T> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
new file mode 100644
index 0000000..d8df04e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Utilities for constructing and manipulating {@link ReadableState} instances.
+ */
+@Experimental(Kind.STATE)
+public class ReadableStates {
+
+  /**
+   * A {@link ReadableState} constructed from a constant value, hence immediately available.
+   */
+  public static <T> ReadableState<T> immediate(final T value) {
+    return new ReadableState<T>() {
+      @Override
+      public T read() {
+        return value;
+      }
+
+      @Override
+      public ReadableState<T> readLater() {
+        return this;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
new file mode 100644
index 0000000..14aa640
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * State containing no duplicate elements.
+ * Items can be added to the set and the contents read out.
+ *
+ * @param <T> The type of elements in the set.
+ */
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
+  /**
+   * Returns true if this set contains the specified element.
+   */
+  ReadableState<Boolean> contains(T t);
+
+  /**
+   * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
+   * false} otherwise.
+   */
+  ReadableState<Boolean> addIfAbsent(T t);
+
+  /**
+   * Removes the specified element from this set if it is present.
+   */
+  void remove(T t);
+
+  @Override
+  SetState<T> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
new file mode 100644
index 0000000..6b10c91
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * Base interface for all state locations.
+ *
+ * <p>Specific types of state add appropriate accessors for reading and writing values, see
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
+ */
+public interface State {
+
+  /**
+   * Clear out the state location.
+   */
+  void clear();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
new file mode 100644
index 0000000..ee4aa78
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+
+/**
+ * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
+ */
+public interface StateBinder {
+  <T> ValueState<T> bindValue(
+      String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
+
+  <T> BagState<T> bindBag(
+      String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
+
+  <T> SetState<T> bindSet(
+      String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
+
+  <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+      String id,
+      StateSpec<MapState<KeyT, ValueT>> spec,
+      Coder<KeyT> mapKeyCoder,
+      Coder<ValueT> mapValueCoder);
+
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
+      String id,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+      Coder<AccumT> accumCoder,
+      Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
+
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+      String id,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+      Coder<AccumT> accumCoder,
+      CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
+
+  /**
+   * Bind to a watermark {@link StateSpec}.
+   *
+   * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
+   * to the returned {@link WatermarkHoldState} are to be combined.
+   */
+  WatermarkHoldState bindWatermark(
+      String id,
+      StateSpec<WatermarkHoldState> spec,
+      TimestampCombiner timestampCombiner);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
new file mode 100644
index 0000000..110a515
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Information accessible the state API.
+ */
+public interface StateContext<W extends BoundedWindow> {
+  /**
+   * Returns the {@code PipelineOptions} specified with the
+   * {@link org.apache.beam.sdk.runners.PipelineRunner}.
+   */
+  PipelineOptions getPipelineOptions();
+
+  /**
+   * Returns the value of the side input for the corresponding state window.
+   */
+  <T> T sideInput(PCollectionView<T> view);
+
+  /**
+   * Returns the window corresponding to the state.
+   */
+  W window();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
new file mode 100644
index 0000000..63afe4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Factory that produces {@link StateContext} based on different inputs.
+ */
+public class StateContexts {
+  private static final StateContext<BoundedWindow> NULL_CONTEXT =
+      new StateContext<BoundedWindow>() {
+        @Override
+        public PipelineOptions getPipelineOptions() {
+          throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view) {
+          throw new IllegalArgumentException("cannot call sideInput() in a null context");
+        }
+
+        @Override
+        public BoundedWindow window() {
+          throw new IllegalArgumentException("cannot call window() in a null context");
+        }
+      };
+
+  /** Returns a fake {@link StateContext}. */
+  @SuppressWarnings("unchecked")
+  public static <W extends BoundedWindow> StateContext<W> nullContext() {
+    return (StateContext<W>) NULL_CONTEXT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
new file mode 100644
index 0000000..3b0b840
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * A specification of a persistent state cell. This includes information necessary to encode the
+ * value and details about the intended access pattern.
+ *
+ * @param <StateT> The type of state being described.
+ */
+@Experimental(Kind.STATE)
+public interface StateSpec<StateT extends State> extends Serializable {
+
+  /**
+   * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
+   */
+  StateT bind(String id, StateBinder binder);
+
+  /**
+   * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+   * already set should take precedence over offered coders.
+   *
+   * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+   *     coder could not be inferred.
+   */
+  void offerCoders(Coder[] coders);
+
+  /**
+   * Validates that this {@link StateSpec} has been specified correctly and finalizes it.
+   * Automatically invoked when the pipeline is built.
+   */
+  void finishSpecifying();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
new file mode 100644
index 0000000..09cc4e7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+
+/**
+ * Static utility methods for creating {@link StateSpec} instances.
+ */
+@Experimental(Kind.STATE)
+public class StateSpecs {
+
+  private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();
+
+  private StateSpecs() {}
+
+  /** Create a simple state spec for values of type {@code T}. */
+  public static <T> StateSpec<ValueState<T>> value() {
+    return new ValueStateSpec<>(null);
+  }
+
+  /** Create a simple state spec for values of type {@code T}. */
+  public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
+    checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
+    return new ValueStateSpec<>(valueCoder);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+   * {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+      CombineFn<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+   * {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    checkArgument(accumCoder != null,
+        "accumCoder should not be null. "
+            + "Consider using combining(CombineFn<> combineFn) instead.");
+    return combiningInternal(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+          Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return combiningInternal(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+   * {@code InputT}s into a single {@code OutputT}.
+   *
+   * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
+   * only be used to initialize static values.
+   */
+  public static <InputT, AccumT, OutputT>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>>
+  combiningFromInputInternal(
+              Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    try {
+      Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
+      return combiningInternal(accumCoder, combineFn);
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalArgumentException(
+          "Unable to determine accumulator coder for "
+              + combineFn.getClass().getSimpleName()
+              + " from "
+              + inputCoder,
+          e);
+    }
+  }
+
+  private static <InputT, AccumT, OutputT>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+  }
+
+  private static <InputT, AccumT, OutputT>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+      Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+   * all the values that have been added.
+   */
+  public static <T> StateSpec<BagState<T>> bag() {
+    return bag(null);
+  }
+
+  /**
+   * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+   * all the values that have been added.
+   */
+  public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
+    return new BagStateSpec<>(elemCoder);
+  }
+
+  /**
+   * Create a state spec that supporting for {@link java.util.Set} like access patterns.
+   */
+  public static <T> StateSpec<SetState<T>> set() {
+    return set(null);
+  }
+
+  /**
+   * Create a state spec that supporting for {@link java.util.Set} like access patterns.
+   */
+  public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
+    return new SetStateSpec<>(elemCoder);
+  }
+
+  /**
+   * Create a state spec that supporting for {@link java.util.Map} like access patterns.
+   */
+  public static <K, V> StateSpec<MapState<K, V>> map() {
+    return new MapStateSpec<>(null, null);
+  }
+
+  /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+  public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
+    return new MapStateSpec<>(keyCoder, valueCoder);
+  }
+
+  /** Create a state spec for holding the watermark. */
+  public static
+      StateSpec<WatermarkHoldState> watermarkStateInternal(
+          TimestampCombiner timestampCombiner) {
+    return new WatermarkStateSpecInternal(timestampCombiner);
+  }
+
+  public static <InputT, AccumT, OutputT>
+      StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+    if (combiningSpec instanceof CombiningStateSpec) {
+      // Checked above; conversion to a bag spec depends on the provided spec being one of those
+      // created via the factory methods in this class.
+      @SuppressWarnings("unchecked")
+      CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
+          (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
+      return typedSpec.asBagSpec();
+    } else if (combiningSpec instanceof CombiningWithContextStateSpec) {
+      @SuppressWarnings("unchecked")
+      CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
+          (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
+      return typedSpec.asBagSpec();
+    } else {
+      throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
+    }
+  }
+
+  /**
+   * A specification for a state cell holding a settable value of type {@code T}.
+   *
+   * <p>Includes the coder for {@code T}.
+   */
+  private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
+
+    @Nullable
+    private Coder<T> coder;
+
+    private ValueStateSpec(@Nullable Coder<T> coder) {
+      this.coder = coder;
+    }
+
+    @Override
+    public ValueState<T> bind(String id, StateBinder visitor) {
+      return visitor.bindValue(id, this, coder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.coder == null) {
+        if (coders[0] != null) {
+          this.coder = (Coder<T>) coders[0];
+        }
+      }
+    }
+
+    @Override public void finishSpecifying() {
+      if (coder == null) {
+        throw new IllegalStateException("Unable to infer a coder for ValueState and no Coder"
+            + " was specified. Please set a coder by either invoking"
+            + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the"
+            + " Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof ValueStateSpec)) {
+        return false;
+      }
+
+      ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
+      return Objects.equals(this.coder, that.coder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), coder);
+    }
+  }
+
+  /**
+   * A specification for a state cell that is combined according to a {@link CombineFn}.
+   *
+   * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
+   */
+  private static class CombiningStateSpec<InputT, AccumT, OutputT>
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
+
+    @Nullable
+    private Coder<AccumT> accumCoder;
+    private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    private CombiningStateSpec(
+        @Nullable Coder<AccumT> accumCoder,
+        CombineFn<InputT, AccumT, OutputT> combineFn) {
+      this.combineFn = combineFn;
+      this.accumCoder = accumCoder;
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> bind(
+        String id, StateBinder visitor) {
+      return visitor.bindCombining(id, this, accumCoder, combineFn);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.accumCoder == null) {
+        if (coders[1] != null) {
+          this.accumCoder = (Coder<AccumT>) coders[1];
+        }
+      }
+    }
+
+    @Override public void finishSpecifying() {
+      if (accumCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for"
+            + " CombiningState and no Coder was specified."
+            + " Please set a coder by either invoking"
+            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+            + " CombineFn<InputT, AccumT, OutputT> combineFn)"
+            + " or by registering the coder in the Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombiningStateSpec)) {
+        return false;
+      }
+
+      CombiningStateSpec<?, ?, ?> that =
+          (CombiningStateSpec<?, ?, ?>) obj;
+      return Objects.equals(this.accumCoder, that.accumCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), accumCoder);
+    }
+
+    private StateSpec<BagState<AccumT>> asBagSpec() {
+      return new BagStateSpec<AccumT>(accumCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell that is combined according to a {@link
+   * CombineFnWithContext}.
+   *
+   * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
+   */
+  private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
+
+    @Nullable private Coder<AccumT> accumCoder;
+    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
+
+    private CombiningWithContextStateSpec(
+        @Nullable Coder<AccumT> accumCoder,
+        CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+      this.combineFn = combineFn;
+      this.accumCoder = accumCoder;
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> bind(
+        String id, StateBinder visitor) {
+      return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.accumCoder == null) {
+        if (coders[2] != null) {
+          this.accumCoder = (Coder<AccumT>) coders[2];
+        }
+      }
+    }
+
+    @Override
+    public void finishSpecifying() {
+      if (accumCoder == null) {
+        throw new IllegalStateException(
+            "Unable to infer a coder for"
+                + " CombiningWithContextState and no Coder was specified."
+                + " Please set a coder by either invoking"
+                + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
+                + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
+                + " or by registering the coder in the Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombiningWithContextStateSpec)) {
+        return false;
+      }
+
+      CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
+      return Objects.equals(this.accumCoder, that.accumCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), accumCoder);
+    }
+
+    private StateSpec<BagState<AccumT>> asBagSpec() {
+      return new BagStateSpec<AccumT>(accumCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell supporting for bag-like access patterns
+   * (frequent additions, occasional reads of all the values).
+   *
+   * <p>Includes the coder for the element type {@code T}</p>
+   */
+  private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
+
+    @Nullable
+    private Coder<T> elemCoder;
+
+    private BagStateSpec(@Nullable Coder<T> elemCoder) {
+      this.elemCoder = elemCoder;
+    }
+
+    @Override
+    public BagState<T> bind(String id, StateBinder visitor) {
+      return visitor.bindBag(id, this, elemCoder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.elemCoder == null) {
+        if (coders[0] != null) {
+          this.elemCoder = (Coder<T>) coders[0];
+        }
+      }
+    }
+
+    @Override public void finishSpecifying() {
+      if (elemCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for BagState and no Coder"
+            + " was specified. Please set a coder by either invoking"
+            + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the"
+            + " Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof BagStateSpec)) {
+        return false;
+      }
+
+      BagStateSpec<?> that = (BagStateSpec<?>) obj;
+      return Objects.equals(this.elemCoder, that.elemCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), elemCoder);
+    }
+  }
+
+  private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
+
+    @Nullable
+    private Coder<K> keyCoder;
+    @Nullable
+    private Coder<V> valueCoder;
+
+    private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) {
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+    }
+
+    @Override
+    public MapState<K, V> bind(String id, StateBinder visitor) {
+      return visitor.bindMap(id, this, keyCoder, valueCoder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.keyCoder == null) {
+        if (coders[0] != null) {
+          this.keyCoder = (Coder<K>) coders[0];
+        }
+      }
+      if (this.valueCoder == null) {
+        if (coders[1] != null) {
+          this.valueCoder = (Coder<V>) coders[1];
+        }
+      }
+    }
+
+    @Override public void finishSpecifying() {
+      if (keyCoder == null || valueCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for MapState and no Coder"
+            + " was specified. Please set a coder by either invoking"
+            + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the"
+            + " coder in the Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof MapStateSpec)) {
+        return false;
+      }
+
+      MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj;
+      return Objects.equals(this.keyCoder, that.keyCoder)
+          && Objects.equals(this.valueCoder, that.valueCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), keyCoder, valueCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell supporting for set-like access patterns.
+   *
+   * <p>Includes the coder for the element type {@code T}</p>
+   */
+  private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
+
+    @Nullable
+    private Coder<T> elemCoder;
+
+    private SetStateSpec(@Nullable Coder<T> elemCoder) {
+      this.elemCoder = elemCoder;
+    }
+
+    @Override
+    public SetState<T> bind(String id, StateBinder visitor) {
+      return visitor.bindSet(id, this, elemCoder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void offerCoders(Coder[] coders) {
+      if (this.elemCoder == null) {
+        if (coders[0] != null) {
+          this.elemCoder = (Coder<T>) coders[0];
+        }
+      }
+    }
+
+    @Override public void finishSpecifying() {
+      if (elemCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for SetState and no Coder"
+            + " was specified. Please set a coder by either invoking"
+            + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the"
+            + " Pipeline's CoderRegistry.");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof SetStateSpec)) {
+        return false;
+      }
+
+      SetStateSpec<?> that = (SetStateSpec<?>) obj;
+      return Objects.equals(this.elemCoder, that.elemCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), elemCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell tracking a combined watermark hold.
+   *
+   * <p>Includes the {@link TimestampCombiner} according to which the output times
+   * are combined.
+   */
+  private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
+
+    /**
+     * When multiple output times are added to hold the watermark, this determines how they are
+     * combined, and also the behavior when merging windows. Does not contribute to equality/hash
+     * since we have at most one watermark hold spec per computation.
+     */
+    private final TimestampCombiner timestampCombiner;
+
+    private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
+      this.timestampCombiner = timestampCombiner;
+    }
+
+    @Override
+    public WatermarkHoldState bind(String id, StateBinder visitor) {
+      return visitor.bindWatermark(id, this, timestampCombiner);
+    }
+
+    @Override
+    public void offerCoders(Coder[] coders) {
+    }
+
+    @Override public void finishSpecifying() {
+      // Currently an empty implementation as there are no coders to validate.
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      // All instance of WatermarkHoldState are considered equal
+      return obj instanceof WatermarkStateSpecInternal;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
new file mode 100644
index 0000000..ca97db2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * State holding a single value.
+ *
+ * @param <T> The type of values being stored.
+ */
+@Experimental(Kind.STATE)
+public interface ValueState<T> extends ReadableState<T>, State {
+  /**
+   * Set the value of the buffer.
+   */
+  void write(T input);
+
+  @Override
+  ValueState<T> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
new file mode 100644
index 0000000..9f6c203
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * A {@link State} accepting and aggregating output timestamps, which determines the time to which
+ * the output watermark must be held.
+ *
+ * <p><b><i>For internal use only. This API may change at any time.</i></b>
+ */
+@Experimental(Kind.STATE)
+public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
+  /**
+   * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
+   * given an element timestamp, and to combine watermarks from windows which are about to be
+   * merged.
+   */
+  TimestampCombiner getTimestampCombiner();
+
+  @Override
+  WatermarkHoldState readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
new file mode 100644
index 0000000..de5eeeb
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines internal utilities for interacting with pipeline state.
+ */
+package org.apache.beam.sdk.state;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
index 9bced41..585d8b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.testing;
 
-import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.state.MapState;
 
 /**
  * Category tag for validation tests which utilize {@link MapState}.

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
index 6fd74bd..7d82d22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.testing;
 
-import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.state.SetState;
 
 /**
  * Category tag for validation tests which utilize {@link SetState}.

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index f3d178e..c858936 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -37,8 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index ca7427c..ead2569 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -320,7 +320,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public org.apache.beam.sdk.util.state.State state(String stateId) {
+            public org.apache.beam.sdk.state.State state(String stateId) {
               throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
             }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index e132115..6828979 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -23,16 +23,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7dd2cdd..c45311a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 3c44afe..d5a1a94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
 
 /**
  * Interface for invoking the {@code DoFn} processing methods.

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 3219f96..72ad4b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -38,8 +40,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index bac3bef..3dfca8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -42,6 +42,8 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -58,8 +60,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index 31d1f64..f93cb0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index a394180..1b1c352 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -23,12 +23,12 @@ import java.io.ObjectOutputStream;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.StateContext;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.state.StateContext;
 
 /**
  * Static utility methods that create combine function instances.

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
deleted file mode 100644
index e0eebe5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * State containing a bag values. Items can be added to the bag and the contents read out.
- *
- * @param <T> The type of elements in the bag.
- */
-public interface BagState<T> extends GroupingState<T, Iterable<T>> {
-  @Override
-  BagState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 80e4dc9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, AccumT, OutputT>
-    extends GroupingState<InputT, OutputT> {
-
-  /**
-   * Read the merged accumulator for this combining value. It is implied that reading the
-   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
-   * this.
-   */
-  AccumT getAccum();
-
-  /**
-   * Add an accumulator to this combining value. Depending on implementation this may immediately
-   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
-   */
-  void addAccum(AccumT accum);
-
-  /**
-   * Merge the given accumulators according to the underlying combiner.
-   */
-  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
-  @Override
-  CombiningState<InputT, AccumT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
deleted file mode 100644
index bd7a8d9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
-  /**
-   * Add a value to the buffer.
-   */
-  void add(InputT value);
-
-  /**
-   * Return true if this state is empty.
-   */
-  ReadableState<Boolean> isEmpty();
-
-  @Override
-  GroupingState<InputT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
deleted file mode 100644
index fb7e807..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.util.Map;
-
-/**
- * An object that maps keys to values.
- * A map cannot contain duplicate keys;
- * each key can map to at most one value.
- *
- * @param <K> the type of keys maintained by this map
- * @param <V> the type of mapped values
- */
-public interface MapState<K, V> extends State {
-
-  /**
-   * Associates the specified value with the specified key in this state.
-   */
-  void put(K key, V value);
-
-  /**
-   * A deferred read-followed-by-write.
-   *
-   * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
-   * map and reconciliation with any pending modifications.
-   *
-   * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
-   * associates it with the given value and returns {@code null}, else returns the current value.
-   */
-  ReadableState<V> putIfAbsent(K key, V value);
-
-  /**
-   * Removes the mapping for a key from this map if it is present.
-   */
-  void remove(K key);
-
-  /**
-   * A deferred lookup.
-   *
-   * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
-   * on the results.
-   *
-   * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
-   * all pending reads in a single batch.
-   */
-  ReadableState<V> get(K key);
-
-  /**
-   * Returns a iterable view of the keys contained in this map.
-   */
-  ReadableState<Iterable<K>> keys();
-
-  /**
-   * Returns a iterable view of the values contained in this map.
-   */
-  ReadableState<Iterable<V>> values();
-
-  /**
-   * Returns a iterable view of all key-values.
-   */
-  ReadableState<Iterable<Map.Entry<K, V>>> entries();
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
deleted file mode 100644
index c3e9936..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * {@link State} that can be read via {@link #read()}.
- *
- * <p>Use {@link #readLater()} for marking several states for prefetching. Runners
- * can potentially batch these into one read.
- *
- * @param <T> The type of value returned by {@link #read}.
- */
-@Experimental(Kind.STATE)
-public interface ReadableState<T> {
-  /**
-   * Read the current value, blocking until it is available.
-   *
-   * <p>If there will be many calls to {@link #read} for different state in short succession,
-   * you should first call {@link #readLater} for all of them so the reads can potentially be
-   * batched (depending on the underlying implementation}.
-   */
-  T read();
-
-  /**
-   * Indicate that the value will be read later.
-   *
-   * <p>This allows an implementation to start an asynchronous prefetch or
-   * to include this state in the next batch of reads.
-   *
-   * @return this for convenient chaining
-   */
-  ReadableState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
deleted file mode 100644
index 819eda6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Utilities for constructing and manipulating {@link ReadableState} instances.
- */
-@Experimental(Kind.STATE)
-public class ReadableStates {
-
-  /**
-   * A {@link ReadableState} constructed from a constant value, hence immediately available.
-   */
-  public static <T> ReadableState<T> immediate(final T value) {
-    return new ReadableState<T>() {
-      @Override
-      public T read() {
-        return value;
-      }
-
-      @Override
-      public ReadableState<T> readLater() {
-        return this;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
deleted file mode 100644
index 56ea510..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * State containing no duplicate elements.
- * Items can be added to the set and the contents read out.
- *
- * @param <T> The type of elements in the set.
- */
-public interface SetState<T> extends GroupingState<T, Iterable<T>> {
-  /**
-   * Returns true if this set contains the specified element.
-   */
-  ReadableState<Boolean> contains(T t);
-
-  /**
-   * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
-   * false} otherwise.
-   */
-  ReadableState<Boolean> addIfAbsent(T t);
-
-  /**
-   * Removes the specified element from this set if it is present.
-   */
-  void remove(T t);
-
-  @Override
-  SetState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
deleted file mode 100644
index 3a49f01..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * Base interface for all state locations.
- *
- * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
- */
-public interface State {
-
-  /**
-   * Clear out the state location.
-   */
-  void clear();
-}