You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:08 UTC

[02/19] beam git commit: Move Java sdk.util.state to sdk.state

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
deleted file mode 100644
index 48fa742..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-
-/**
- * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
- */
-public interface StateBinder {
-  <T> ValueState<T> bindValue(
-      String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
-
-  <T> BagState<T> bindBag(
-      String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
-
-  <T> SetState<T> bindSet(
-      String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
-
-  <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-      String id,
-      StateSpec<MapState<KeyT, ValueT>> spec,
-      Coder<KeyT> mapKeyCoder,
-      Coder<ValueT> mapValueCoder);
-
-  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
-      String id,
-      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
-      Coder<AccumT> accumCoder,
-      Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
-
-  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
-      String id,
-      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
-      Coder<AccumT> accumCoder,
-      CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
-
-  /**
-   * Bind to a watermark {@link StateSpec}.
-   *
-   * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
-   * to the returned {@link WatermarkHoldState} are to be combined.
-   */
-  WatermarkHoldState bindWatermark(
-      String id,
-      StateSpec<WatermarkHoldState> spec,
-      TimestampCombiner timestampCombiner);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java
deleted file mode 100644
index 887a5f1..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Information accessible the state API.
- */
-public interface StateContext<W extends BoundedWindow> {
-  /**
-   * Returns the {@code PipelineOptions} specified with the
-   * {@link org.apache.beam.sdk.runners.PipelineRunner}.
-   */
-  PipelineOptions getPipelineOptions();
-
-  /**
-   * Returns the value of the side input for the corresponding state window.
-   */
-  <T> T sideInput(PCollectionView<T> view);
-
-  /**
-   * Returns the window corresponding to the state.
-   */
-  W window();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
deleted file mode 100644
index 2ce9594..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Factory that produces {@link StateContext} based on different inputs.
- */
-public class StateContexts {
-  private static final StateContext<BoundedWindow> NULL_CONTEXT =
-      new StateContext<BoundedWindow>() {
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          throw new IllegalArgumentException("cannot call sideInput() in a null context");
-        }
-
-        @Override
-        public BoundedWindow window() {
-          throw new IllegalArgumentException("cannot call window() in a null context");
-        }
-      };
-
-  /** Returns a fake {@link StateContext}. */
-  @SuppressWarnings("unchecked")
-  public static <W extends BoundedWindow> StateContext<W> nullContext() {
-    return (StateContext<W>) NULL_CONTEXT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
deleted file mode 100644
index 8eda218..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-
-/**
- * A specification of a persistent state cell. This includes information necessary to encode the
- * value and details about the intended access pattern.
- *
- * @param <StateT> The type of state being described.
- */
-@Experimental(Kind.STATE)
-public interface StateSpec<StateT extends State> extends Serializable {
-
-  /**
-   * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
-   */
-  StateT bind(String id, StateBinder binder);
-
-  /**
-   * Given {code coders} are inferred from type arguments defined for this class. Coders which are
-   * already set should take precedence over offered coders.
-   *
-   * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
-   *     coder could not be inferred.
-   */
-  void offerCoders(Coder[] coders);
-
-  /**
-   * Validates that this {@link StateSpec} has been specified correctly and finalizes it.
-   * Automatically invoked when the pipeline is built.
-   */
-  void finishSpecifying();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
deleted file mode 100644
index 49d5722..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-
-/**
- * Static utility methods for creating {@link StateSpec} instances.
- */
-@Experimental(Kind.STATE)
-public class StateSpecs {
-
-  private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();
-
-  private StateSpecs() {}
-
-  /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<ValueState<T>> value() {
-    return new ValueStateSpec<>(null);
-  }
-
-  /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
-    checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
-    return new ValueStateSpec<>(valueCoder);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
-   * {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
-      CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
-          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
-   * {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
-          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    checkArgument(accumCoder != null,
-        "accumCoder should not be null. "
-            + "Consider using combining(CombineFn<> combineFn) instead.");
-    return combiningInternal(accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
-          Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-    return combiningInternal(accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
-   * {@code InputT}s into a single {@code OutputT}.
-   *
-   * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
-   * only be used to initialize static values.
-   */
-  public static <InputT, AccumT, OutputT>
-  StateSpec<CombiningState<InputT, AccumT, OutputT>>
-  combiningFromInputInternal(
-              Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    try {
-      Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
-      return combiningInternal(accumCoder, combineFn);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to determine accumulator coder for "
-              + combineFn.getClass().getSimpleName()
-              + " from "
-              + inputCoder,
-          e);
-    }
-  }
-
-  private static <InputT, AccumT, OutputT>
-  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
-          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
-  }
-
-  private static <InputT, AccumT, OutputT>
-  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
-      Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
-   * all the values that have been added.
-   */
-  public static <T> StateSpec<BagState<T>> bag() {
-    return bag(null);
-  }
-
-  /**
-   * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
-   * all the values that have been added.
-   */
-  public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
-    return new BagStateSpec<>(elemCoder);
-  }
-
-  /**
-   * Create a state spec that supporting for {@link java.util.Set} like access patterns.
-   */
-  public static <T> StateSpec<SetState<T>> set() {
-    return set(null);
-  }
-
-  /**
-   * Create a state spec that supporting for {@link java.util.Set} like access patterns.
-   */
-  public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
-    return new SetStateSpec<>(elemCoder);
-  }
-
-  /**
-   * Create a state spec that supporting for {@link java.util.Map} like access patterns.
-   */
-  public static <K, V> StateSpec<MapState<K, V>> map() {
-    return new MapStateSpec<>(null, null);
-  }
-
-  /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
-  public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
-    return new MapStateSpec<>(keyCoder, valueCoder);
-  }
-
-  /** Create a state spec for holding the watermark. */
-  public static
-      StateSpec<WatermarkHoldState> watermarkStateInternal(
-          TimestampCombiner timestampCombiner) {
-    return new WatermarkStateSpecInternal(timestampCombiner);
-  }
-
-  public static <InputT, AccumT, OutputT>
-      StateSpec<BagState<AccumT>> convertToBagSpecInternal(
-          StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
-    if (combiningSpec instanceof CombiningStateSpec) {
-      // Checked above; conversion to a bag spec depends on the provided spec being one of those
-      // created via the factory methods in this class.
-      @SuppressWarnings("unchecked")
-      CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
-          (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
-      return typedSpec.asBagSpec();
-    } else if (combiningSpec instanceof CombiningWithContextStateSpec) {
-      @SuppressWarnings("unchecked")
-      CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
-          (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
-      return typedSpec.asBagSpec();
-    } else {
-      throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
-    }
-  }
-
-  /**
-   * A specification for a state cell holding a settable value of type {@code T}.
-   *
-   * <p>Includes the coder for {@code T}.
-   */
-  private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
-
-    @Nullable
-    private Coder<T> coder;
-
-    private ValueStateSpec(@Nullable Coder<T> coder) {
-      this.coder = coder;
-    }
-
-    @Override
-    public ValueState<T> bind(String id, StateBinder visitor) {
-      return visitor.bindValue(id, this, coder);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.coder == null) {
-        if (coders[0] != null) {
-          this.coder = (Coder<T>) coders[0];
-        }
-      }
-    }
-
-    @Override public void finishSpecifying() {
-      if (coder == null) {
-        throw new IllegalStateException("Unable to infer a coder for ValueState and no Coder"
-            + " was specified. Please set a coder by either invoking"
-            + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the"
-            + " Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof ValueStateSpec)) {
-        return false;
-      }
-
-      ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
-      return Objects.equals(this.coder, that.coder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), coder);
-    }
-  }
-
-  /**
-   * A specification for a state cell that is combined according to a {@link CombineFn}.
-   *
-   * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
-   */
-  private static class CombiningStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
-
-    @Nullable
-    private Coder<AccumT> accumCoder;
-    private final CombineFn<InputT, AccumT, OutputT> combineFn;
-
-    private CombiningStateSpec(
-        @Nullable Coder<AccumT> accumCoder,
-        CombineFn<InputT, AccumT, OutputT> combineFn) {
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder visitor) {
-      return visitor.bindCombining(id, this, accumCoder, combineFn);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.accumCoder == null) {
-        if (coders[1] != null) {
-          this.accumCoder = (Coder<AccumT>) coders[1];
-        }
-      }
-    }
-
-    @Override public void finishSpecifying() {
-      if (accumCoder == null) {
-        throw new IllegalStateException("Unable to infer a coder for"
-            + " CombiningState and no Coder was specified."
-            + " Please set a coder by either invoking"
-            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
-            + " CombineFn<InputT, AccumT, OutputT> combineFn)"
-            + " or by registering the coder in the Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof CombiningStateSpec)) {
-        return false;
-      }
-
-      CombiningStateSpec<?, ?, ?> that =
-          (CombiningStateSpec<?, ?, ?>) obj;
-      return Objects.equals(this.accumCoder, that.accumCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), accumCoder);
-    }
-
-    private StateSpec<BagState<AccumT>> asBagSpec() {
-      return new BagStateSpec<AccumT>(accumCoder);
-    }
-  }
-
-  /**
-   * A specification for a state cell that is combined according to a {@link
-   * CombineFnWithContext}.
-   *
-   * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
-   */
-  private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
-
-    @Nullable private Coder<AccumT> accumCoder;
-    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
-
-    private CombiningWithContextStateSpec(
-        @Nullable Coder<AccumT> accumCoder,
-        CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder visitor) {
-      return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.accumCoder == null) {
-        if (coders[2] != null) {
-          this.accumCoder = (Coder<AccumT>) coders[2];
-        }
-      }
-    }
-
-    @Override
-    public void finishSpecifying() {
-      if (accumCoder == null) {
-        throw new IllegalStateException(
-            "Unable to infer a coder for"
-                + " CombiningWithContextState and no Coder was specified."
-                + " Please set a coder by either invoking"
-                + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
-                + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
-                + " or by registering the coder in the Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof CombiningWithContextStateSpec)) {
-        return false;
-      }
-
-      CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
-      return Objects.equals(this.accumCoder, that.accumCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), accumCoder);
-    }
-
-    private StateSpec<BagState<AccumT>> asBagSpec() {
-      return new BagStateSpec<AccumT>(accumCoder);
-    }
-  }
-
-  /**
-   * A specification for a state cell supporting for bag-like access patterns
-   * (frequent additions, occasional reads of all the values).
-   *
-   * <p>Includes the coder for the element type {@code T}</p>
-   */
-  private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
-
-    @Nullable
-    private Coder<T> elemCoder;
-
-    private BagStateSpec(@Nullable Coder<T> elemCoder) {
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public BagState<T> bind(String id, StateBinder visitor) {
-      return visitor.bindBag(id, this, elemCoder);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.elemCoder == null) {
-        if (coders[0] != null) {
-          this.elemCoder = (Coder<T>) coders[0];
-        }
-      }
-    }
-
-    @Override public void finishSpecifying() {
-      if (elemCoder == null) {
-        throw new IllegalStateException("Unable to infer a coder for BagState and no Coder"
-            + " was specified. Please set a coder by either invoking"
-            + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the"
-            + " Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof BagStateSpec)) {
-        return false;
-      }
-
-      BagStateSpec<?> that = (BagStateSpec<?>) obj;
-      return Objects.equals(this.elemCoder, that.elemCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), elemCoder);
-    }
-  }
-
-  private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
-
-    @Nullable
-    private Coder<K> keyCoder;
-    @Nullable
-    private Coder<V> valueCoder;
-
-    private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) {
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-    }
-
-    @Override
-    public MapState<K, V> bind(String id, StateBinder visitor) {
-      return visitor.bindMap(id, this, keyCoder, valueCoder);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.keyCoder == null) {
-        if (coders[0] != null) {
-          this.keyCoder = (Coder<K>) coders[0];
-        }
-      }
-      if (this.valueCoder == null) {
-        if (coders[1] != null) {
-          this.valueCoder = (Coder<V>) coders[1];
-        }
-      }
-    }
-
-    @Override public void finishSpecifying() {
-      if (keyCoder == null || valueCoder == null) {
-        throw new IllegalStateException("Unable to infer a coder for MapState and no Coder"
-            + " was specified. Please set a coder by either invoking"
-            + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the"
-            + " coder in the Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof MapStateSpec)) {
-        return false;
-      }
-
-      MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj;
-      return Objects.equals(this.keyCoder, that.keyCoder)
-          && Objects.equals(this.valueCoder, that.valueCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), keyCoder, valueCoder);
-    }
-  }
-
-  /**
-   * A specification for a state cell supporting for set-like access patterns.
-   *
-   * <p>Includes the coder for the element type {@code T}</p>
-   */
-  private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
-
-    @Nullable
-    private Coder<T> elemCoder;
-
-    private SetStateSpec(@Nullable Coder<T> elemCoder) {
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public SetState<T> bind(String id, StateBinder visitor) {
-      return visitor.bindSet(id, this, elemCoder);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.elemCoder == null) {
-        if (coders[0] != null) {
-          this.elemCoder = (Coder<T>) coders[0];
-        }
-      }
-    }
-
-    @Override public void finishSpecifying() {
-      if (elemCoder == null) {
-        throw new IllegalStateException("Unable to infer a coder for SetState and no Coder"
-            + " was specified. Please set a coder by either invoking"
-            + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the"
-            + " Pipeline's CoderRegistry.");
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof SetStateSpec)) {
-        return false;
-      }
-
-      SetStateSpec<?> that = (SetStateSpec<?>) obj;
-      return Objects.equals(this.elemCoder, that.elemCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), elemCoder);
-    }
-  }
-
-  /**
-   * A specification for a state cell tracking a combined watermark hold.
-   *
-   * <p>Includes the {@link TimestampCombiner} according to which the output times
-   * are combined.
-   */
-  private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
-
-    /**
-     * When multiple output times are added to hold the watermark, this determines how they are
-     * combined, and also the behavior when merging windows. Does not contribute to equality/hash
-     * since we have at most one watermark hold spec per computation.
-     */
-    private final TimestampCombiner timestampCombiner;
-
-    private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
-      this.timestampCombiner = timestampCombiner;
-    }
-
-    @Override
-    public WatermarkHoldState bind(String id, StateBinder visitor) {
-      return visitor.bindWatermark(id, this, timestampCombiner);
-    }
-
-    @Override
-    public void offerCoders(Coder[] coders) {
-    }
-
-    @Override public void finishSpecifying() {
-      // Currently an empty implementation as there are no coders to validate.
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      // All instance of WatermarkHoldState are considered equal
-      return obj instanceof WatermarkStateSpecInternal;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java
deleted file mode 100644
index b432203..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * State holding a single value.
- *
- * @param <T> The type of values being stored.
- */
-@Experimental(Kind.STATE)
-public interface ValueState<T> extends ReadableState<T>, State {
-  /**
-   * Set the value of the buffer.
-   */
-  void write(T input);
-
-  @Override
-  ValueState<T> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
deleted file mode 100644
index ae9b700..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * A {@link State} accepting and aggregating output timestamps, which determines the time to which
- * the output watermark must be held.
- *
- * <p><b><i>For internal use only. This API may change at any time.</i></b>
- */
-@Experimental(Kind.STATE)
-public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
-  /**
-   * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
-   * given an element timestamp, and to combine watermarks from windows which are about to be
-   * merged.
-   */
-  TimestampCombiner getTimestampCombiner();
-
-  @Override
-  WatermarkHoldState readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java
deleted file mode 100644
index b9bec16..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Defines internal utilities for interacting with pipeline state.
- */
-package org.apache.beam.sdk.util.state;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 56051a6..1d41923 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -64,6 +64,13 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -92,13 +99,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index fe96e87..13e46d5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -38,6 +38,9 @@ import java.util.List;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
@@ -50,9 +53,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index f099d5d..27e0b89 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,6 +29,10 @@ import static org.junit.Assert.fail;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
@@ -41,10 +45,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matcher;

http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
index 798e8dc..d16671b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
@@ -27,10 +27,10 @@ import java.io.ByteArrayOutputStream;
 import java.io.NotSerializableException;
 import java.io.ObjectOutputStream;
 import java.util.List;
+import org.apache.beam.sdk.state.StateContexts;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.StateContexts;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;