You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:09 UTC
[03/19] beam git commit: Move Java sdk.util.state to sdk.state
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 28bbc3c..1db0e86 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -373,7 +373,7 @@
<!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
+ <Class name="StateSpecs$CombiningStateSpec"/>
<Method name="equals"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
<!--[BEAM-421] Class doesn't override equals in superclass-->
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
new file mode 100644
index 0000000..189d151
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * State containing a bag values. Items can be added to the bag and the contents read out.
+ *
+ * @param <T> The type of elements in the bag.
+ */
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
+ @Override
+ BagState<T> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
new file mode 100644
index 0000000..6080127
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+ extends GroupingState<InputT, OutputT> {
+
+ /**
+ * Read the merged accumulator for this combining value. It is implied that reading the
+ * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+ * this.
+ */
+ AccumT getAccum();
+
+ /**
+ * Add an accumulator to this combining value. Depending on implementation this may immediately
+ * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+ */
+ void addAccum(AccumT accum);
+
+ /**
+ * Merge the given accumulators according to the underlying combiner.
+ */
+ AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+ @Override
+ CombiningState<InputT, AccumT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
new file mode 100644
index 0000000..3a12e79
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+ /**
+ * Add a value to the buffer.
+ */
+ void add(InputT value);
+
+ /**
+ * Return true if this state is empty.
+ */
+ ReadableState<Boolean> isEmpty();
+
+ @Override
+ GroupingState<InputT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
new file mode 100644
index 0000000..9f0eee9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import java.util.Map;
+
+/**
+ * An object that maps keys to values.
+ * A map cannot contain duplicate keys;
+ * each key can map to at most one value.
+ *
+ * @param <K> the type of keys maintained by this map
+ * @param <V> the type of mapped values
+ */
+public interface MapState<K, V> extends State {
+
+ /**
+ * Associates the specified value with the specified key in this state.
+ */
+ void put(K key, V value);
+
+ /**
+ * A deferred read-followed-by-write.
+ *
+ * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
+ * map and reconciliation with any pending modifications.
+ *
+ * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
+ * associates it with the given value and returns {@code null}, else returns the current value.
+ */
+ ReadableState<V> putIfAbsent(K key, V value);
+
+ /**
+ * Removes the mapping for a key from this map if it is present.
+ */
+ void remove(K key);
+
+ /**
+ * A deferred lookup.
+ *
+ * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
+ * on the results.
+ *
+ * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
+ * all pending reads in a single batch.
+ */
+ ReadableState<V> get(K key);
+
+ /**
+ * Returns a iterable view of the keys contained in this map.
+ */
+ ReadableState<Iterable<K>> keys();
+
+ /**
+ * Returns a iterable view of the values contained in this map.
+ */
+ ReadableState<Iterable<V>> values();
+
+ /**
+ * Returns a iterable view of all key-values.
+ */
+ ReadableState<Iterable<Map.Entry<K, V>>> entries();
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
new file mode 100644
index 0000000..b29ab26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * {@link State} that can be read via {@link #read()}.
+ *
+ * <p>Use {@link #readLater()} for marking several states for prefetching. Runners
+ * can potentially batch these into one read.
+ *
+ * @param <T> The type of value returned by {@link #read}.
+ */
+@Experimental(Kind.STATE)
+public interface ReadableState<T> {
+ /**
+ * Read the current value, blocking until it is available.
+ *
+ * <p>If there will be many calls to {@link #read} for different state in short succession,
+ * you should first call {@link #readLater} for all of them so the reads can potentially be
+ * batched (depending on the underlying implementation}.
+ */
+ T read();
+
+ /**
+ * Indicate that the value will be read later.
+ *
+ * <p>This allows an implementation to start an asynchronous prefetch or
+ * to include this state in the next batch of reads.
+ *
+ * @return this for convenient chaining
+ */
+ ReadableState<T> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
new file mode 100644
index 0000000..d8df04e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Utilities for constructing and manipulating {@link ReadableState} instances.
+ */
+@Experimental(Kind.STATE)
+public class ReadableStates {
+
+ /**
+ * A {@link ReadableState} constructed from a constant value, hence immediately available.
+ */
+ public static <T> ReadableState<T> immediate(final T value) {
+ return new ReadableState<T>() {
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
new file mode 100644
index 0000000..14aa640
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * State containing no duplicate elements.
+ * Items can be added to the set and the contents read out.
+ *
+ * @param <T> The type of elements in the set.
+ */
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
+ /**
+ * Returns true if this set contains the specified element.
+ */
+ ReadableState<Boolean> contains(T t);
+
+ /**
+ * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
+ * false} otherwise.
+ */
+ ReadableState<Boolean> addIfAbsent(T t);
+
+ /**
+ * Removes the specified element from this set if it is present.
+ */
+ void remove(T t);
+
+ @Override
+ SetState<T> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
new file mode 100644
index 0000000..6b10c91
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+/**
+ * Base interface for all state locations.
+ *
+ * <p>Specific types of state add appropriate accessors for reading and writing values, see
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
+ */
+public interface State {
+
+ /**
+ * Clear out the state location.
+ */
+ void clear();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
new file mode 100644
index 0000000..ee4aa78
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+
+/**
+ * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
+ */
+public interface StateBinder {
+ <T> ValueState<T> bindValue(
+ String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
+
+ <T> BagState<T> bindBag(
+ String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
+
+ <T> SetState<T> bindSet(
+ String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
+
+ <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder);
+
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
+
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
+
+ /**
+ * Bind to a watermark {@link StateSpec}.
+ *
+ * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
+ * to the returned {@link WatermarkHoldState} are to be combined.
+ */
+ WatermarkHoldState bindWatermark(
+ String id,
+ StateSpec<WatermarkHoldState> spec,
+ TimestampCombiner timestampCombiner);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
new file mode 100644
index 0000000..110a515
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Information accessible the state API.
+ */
+public interface StateContext<W extends BoundedWindow> {
+ /**
+ * Returns the {@code PipelineOptions} specified with the
+ * {@link org.apache.beam.sdk.runners.PipelineRunner}.
+ */
+ PipelineOptions getPipelineOptions();
+
+ /**
+ * Returns the value of the side input for the corresponding state window.
+ */
+ <T> T sideInput(PCollectionView<T> view);
+
+ /**
+ * Returns the window corresponding to the state.
+ */
+ W window();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
new file mode 100644
index 0000000..63afe4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Factory that produces {@link StateContext} based on different inputs.
+ */
+public class StateContexts {
+ private static final StateContext<BoundedWindow> NULL_CONTEXT =
+ new StateContext<BoundedWindow>() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new IllegalArgumentException("cannot call sideInput() in a null context");
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalArgumentException("cannot call window() in a null context");
+ }
+ };
+
+ /** Returns a fake {@link StateContext}. */
+ @SuppressWarnings("unchecked")
+ public static <W extends BoundedWindow> StateContext<W> nullContext() {
+ return (StateContext<W>) NULL_CONTEXT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
new file mode 100644
index 0000000..3b0b840
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * A specification of a persistent state cell. This includes information necessary to encode the
+ * value and details about the intended access pattern.
+ *
+ * @param <StateT> The type of state being described.
+ */
+@Experimental(Kind.STATE)
+public interface StateSpec<StateT extends State> extends Serializable {
+
+ /**
+ * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
+ */
+ StateT bind(String id, StateBinder binder);
+
+ /**
+ * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+ * already set should take precedence over offered coders.
+ *
+ * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+ * coder could not be inferred.
+ */
+ void offerCoders(Coder[] coders);
+
+ /**
+ * Validates that this {@link StateSpec} has been specified correctly and finalizes it.
+ * Automatically invoked when the pipeline is built.
+ */
+ void finishSpecifying();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
new file mode 100644
index 0000000..09cc4e7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+
+/**
+ * Static utility methods for creating {@link StateSpec} instances.
+ */
+@Experimental(Kind.STATE)
+public class StateSpecs {
+
+ private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();
+
+ private StateSpecs() {}
+
+ /** Create a simple state spec for values of type {@code T}. */
+ public static <T> StateSpec<ValueState<T>> value() {
+ return new ValueStateSpec<>(null);
+ }
+
+ /** Create a simple state spec for values of type {@code T}. */
+ public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
+ checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
+ return new ValueStateSpec<>(valueCoder);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+ * {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+ * multiple {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+ * {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ checkArgument(accumCoder != null,
+ "accumCoder should not be null. "
+ + "Consider using combining(CombineFn<> combineFn) instead.");
+ return combiningInternal(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+ * multiple {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return combiningInternal(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+ * {@code InputT}s into a single {@code OutputT}.
+ *
+ * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
+ * only be used to initialize static values.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>>
+ combiningFromInputInternal(
+ Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ try {
+ Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
+ return combiningInternal(accumCoder, combineFn);
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(
+ "Unable to determine accumulator coder for "
+ + combineFn.getClass().getSimpleName()
+ + " from "
+ + inputCoder,
+ e);
+ }
+ }
+
+ private static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+ }
+
+ private static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+ * all the values that have been added.
+ */
+ public static <T> StateSpec<BagState<T>> bag() {
+ return bag(null);
+ }
+
+ /**
+ * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+ * all the values that have been added.
+ */
+ public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
+ return new BagStateSpec<>(elemCoder);
+ }
+
+ /**
+ * Create a state spec that supporting for {@link java.util.Set} like access patterns.
+ */
+ public static <T> StateSpec<SetState<T>> set() {
+ return set(null);
+ }
+
+ /**
+ * Create a state spec that supporting for {@link java.util.Set} like access patterns.
+ */
+ public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
+ return new SetStateSpec<>(elemCoder);
+ }
+
+ /**
+ * Create a state spec that supporting for {@link java.util.Map} like access patterns.
+ */
+ public static <K, V> StateSpec<MapState<K, V>> map() {
+ return new MapStateSpec<>(null, null);
+ }
+
+ /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+ public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
+ return new MapStateSpec<>(keyCoder, valueCoder);
+ }
+
+ /** Create a state spec for holding the watermark. */
+ public static
+ StateSpec<WatermarkHoldState> watermarkStateInternal(
+ TimestampCombiner timestampCombiner) {
+ return new WatermarkStateSpecInternal(timestampCombiner);
+ }
+
+ public static <InputT, AccumT, OutputT>
+ StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ if (combiningSpec instanceof CombiningStateSpec) {
+ // Checked above; conversion to a bag spec depends on the provided spec being one of those
+ // created via the factory methods in this class.
+ @SuppressWarnings("unchecked")
+ CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
+ (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
+ return typedSpec.asBagSpec();
+ } else if (combiningSpec instanceof CombiningWithContextStateSpec) {
+ @SuppressWarnings("unchecked")
+ CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
+ (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
+ return typedSpec.asBagSpec();
+ } else {
+ throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
+ }
+ }
+
+ /**
+ * A specification for a state cell holding a settable value of type {@code T}.
+ *
+ * <p>Includes the coder for {@code T}.
+ */
+ private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
+
+ @Nullable
+ private Coder<T> coder;
+
+ private ValueStateSpec(@Nullable Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public ValueState<T> bind(String id, StateBinder visitor) {
+ return visitor.bindValue(id, this, coder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.coder == null) {
+ if (coders[0] != null) {
+ this.coder = (Coder<T>) coders[0];
+ }
+ }
+ }
+
+ @Override public void finishSpecifying() {
+ if (coder == null) {
+ throw new IllegalStateException("Unable to infer a coder for ValueState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof ValueStateSpec)) {
+ return false;
+ }
+
+ ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
+ return Objects.equals(this.coder, that.coder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), coder);
+ }
+ }
+
+ /**
+ * A specification for a state cell that is combined according to a {@link CombineFn}.
+ *
+ * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
+ */
+ private static class CombiningStateSpec<InputT, AccumT, OutputT>
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
+
+ @Nullable
+ private Coder<AccumT> accumCoder;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ private CombiningStateSpec(
+ @Nullable Coder<AccumT> accumCoder,
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> bind(
+ String id, StateBinder visitor) {
+ return visitor.bindCombining(id, this, accumCoder, combineFn);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.accumCoder == null) {
+ if (coders[1] != null) {
+ this.accumCoder = (Coder<AccumT>) coders[1];
+ }
+ }
+ }
+
+ @Override public void finishSpecifying() {
+ if (accumCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for"
+ + " CombiningState and no Coder was specified."
+ + " Please set a coder by either invoking"
+ + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+ + " CombineFn<InputT, AccumT, OutputT> combineFn)"
+ + " or by registering the coder in the Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof CombiningStateSpec)) {
+ return false;
+ }
+
+ CombiningStateSpec<?, ?, ?> that =
+ (CombiningStateSpec<?, ?, ?>) obj;
+ return Objects.equals(this.accumCoder, that.accumCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), accumCoder);
+ }
+
+ private StateSpec<BagState<AccumT>> asBagSpec() {
+ return new BagStateSpec<AccumT>(accumCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell that is combined according to a {@link
+ * CombineFnWithContext}.
+ *
+ * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
+ */
+ private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
+
+ @Nullable private Coder<AccumT> accumCoder;
+ private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
+
+ private CombiningWithContextStateSpec(
+ @Nullable Coder<AccumT> accumCoder,
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> bind(
+ String id, StateBinder visitor) {
+ return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.accumCoder == null) {
+ if (coders[2] != null) {
+ this.accumCoder = (Coder<AccumT>) coders[2];
+ }
+ }
+ }
+
+ @Override
+ public void finishSpecifying() {
+ if (accumCoder == null) {
+ throw new IllegalStateException(
+ "Unable to infer a coder for"
+ + " CombiningWithContextState and no Coder was specified."
+ + " Please set a coder by either invoking"
+ + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
+ + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
+ + " or by registering the coder in the Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof CombiningWithContextStateSpec)) {
+ return false;
+ }
+
+ CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
+ return Objects.equals(this.accumCoder, that.accumCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), accumCoder);
+ }
+
+ private StateSpec<BagState<AccumT>> asBagSpec() {
+ return new BagStateSpec<AccumT>(accumCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell supporting for bag-like access patterns
+ * (frequent additions, occasional reads of all the values).
+ *
+ * <p>Includes the coder for the element type {@code T}</p>
+ */
+ private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
+
+ @Nullable
+ private Coder<T> elemCoder;
+
+ private BagStateSpec(@Nullable Coder<T> elemCoder) {
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public BagState<T> bind(String id, StateBinder visitor) {
+ return visitor.bindBag(id, this, elemCoder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.elemCoder == null) {
+ if (coders[0] != null) {
+ this.elemCoder = (Coder<T>) coders[0];
+ }
+ }
+ }
+
+ @Override public void finishSpecifying() {
+ if (elemCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for BagState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof BagStateSpec)) {
+ return false;
+ }
+
+ BagStateSpec<?> that = (BagStateSpec<?>) obj;
+ return Objects.equals(this.elemCoder, that.elemCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), elemCoder);
+ }
+ }
+
+ private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
+
+ @Nullable
+ private Coder<K> keyCoder;
+ @Nullable
+ private Coder<V> valueCoder;
+
+ private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) {
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public MapState<K, V> bind(String id, StateBinder visitor) {
+ return visitor.bindMap(id, this, keyCoder, valueCoder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.keyCoder == null) {
+ if (coders[0] != null) {
+ this.keyCoder = (Coder<K>) coders[0];
+ }
+ }
+ if (this.valueCoder == null) {
+ if (coders[1] != null) {
+ this.valueCoder = (Coder<V>) coders[1];
+ }
+ }
+ }
+
+ @Override public void finishSpecifying() {
+ if (keyCoder == null || valueCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for MapState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the"
+ + " coder in the Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof MapStateSpec)) {
+ return false;
+ }
+
+ MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj;
+ return Objects.equals(this.keyCoder, that.keyCoder)
+ && Objects.equals(this.valueCoder, that.valueCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), keyCoder, valueCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell supporting for set-like access patterns.
+ *
+ * <p>Includes the coder for the element type {@code T}</p>
+ */
+ private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
+
+ @Nullable
+ private Coder<T> elemCoder;
+
+ private SetStateSpec(@Nullable Coder<T> elemCoder) {
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public SetState<T> bind(String id, StateBinder visitor) {
+ return visitor.bindSet(id, this, elemCoder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void offerCoders(Coder[] coders) {
+ if (this.elemCoder == null) {
+ if (coders[0] != null) {
+ this.elemCoder = (Coder<T>) coders[0];
+ }
+ }
+ }
+
+ @Override public void finishSpecifying() {
+ if (elemCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for SetState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof SetStateSpec)) {
+ return false;
+ }
+
+ SetStateSpec<?> that = (SetStateSpec<?>) obj;
+ return Objects.equals(this.elemCoder, that.elemCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), elemCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell tracking a combined watermark hold.
+ *
+ * <p>Includes the {@link TimestampCombiner} according to which the output times
+ * are combined.
+ */
+ private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
+
+ /**
+ * When multiple output times are added to hold the watermark, this determines how they are
+ * combined, and also the behavior when merging windows. Does not contribute to equality/hash
+ * since we have at most one watermark hold spec per computation.
+ */
+ private final TimestampCombiner timestampCombiner;
+
+ private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
+ }
+
+ @Override
+ public WatermarkHoldState bind(String id, StateBinder visitor) {
+ return visitor.bindWatermark(id, this, timestampCombiner);
+ }
+
+ @Override
+ public void offerCoders(Coder[] coders) {
+ }
+
+ @Override public void finishSpecifying() {
+ // Currently an empty implementation as there are no coders to validate.
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ // All instance of WatermarkHoldState are considered equal
+ return obj instanceof WatermarkStateSpecInternal;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
new file mode 100644
index 0000000..ca97db2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * State holding a single value.
+ *
+ * @param <T> The type of values being stored.
+ */
+@Experimental(Kind.STATE)
+public interface ValueState<T> extends ReadableState<T>, State {
+ /**
+ * Set the value of the buffer.
+ */
+ void write(T input);
+
+ @Override
+ ValueState<T> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
new file mode 100644
index 0000000..9f6c203
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * A {@link State} accepting and aggregating output timestamps, which determines the time to which
+ * the output watermark must be held.
+ *
+ * <p><b><i>For internal use only. This API may change at any time.</i></b>
+ */
+@Experimental(Kind.STATE)
+public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
+ /**
+ * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
+ * given an element timestamp, and to combine watermarks from windows which are about to be
+ * merged.
+ */
+ TimestampCombiner getTimestampCombiner();
+
+ @Override
+ WatermarkHoldState readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
new file mode 100644
index 0000000..de5eeeb
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines internal utilities for interacting with pipeline state.
+ */
+package org.apache.beam.sdk.state;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
index 9bced41..585d8b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.testing;
-import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.state.MapState;
/**
* Category tag for validation tests which utilize {@link MapState}.
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
index 6fd74bd..7d82d22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.testing;
-import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.state.SetState;
/**
* Category tag for validation tests which utilize {@link SetState}.
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index f3d178e..c858936 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -37,8 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index ca7427c..ead2569 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -320,7 +320,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public org.apache.beam.sdk.util.state.State state(String stateId) {
+ public org.apache.beam.sdk.state.State state(String stateId) {
throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index e132115..6828979 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -23,16 +23,16 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7dd2cdd..c45311a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 3c44afe..d5a1a94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
/**
* Interface for invoking the {@code DoFn} processing methods.
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 3219f96..72ad4b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -38,8 +40,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index bac3bef..3dfca8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -42,6 +42,8 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -58,8 +60,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index 31d1f64..f93cb0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.values.PCollectionView;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index a394180..1b1c352 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -23,12 +23,12 @@ import java.io.ObjectOutputStream;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.state.StateContext;
/**
* Static utility methods that create combine function instances.
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
deleted file mode 100644
index e0eebe5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * State containing a bag values. Items can be added to the bag and the contents read out.
- *
- * @param <T> The type of elements in the bag.
- */
-public interface BagState<T> extends GroupingState<T, Iterable<T>> {
- @Override
- BagState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 80e4dc9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, AccumT, OutputT>
- extends GroupingState<InputT, OutputT> {
-
- /**
- * Read the merged accumulator for this combining value. It is implied that reading the
- * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
- * this.
- */
- AccumT getAccum();
-
- /**
- * Add an accumulator to this combining value. Depending on implementation this may immediately
- * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
- */
- void addAccum(AccumT accum);
-
- /**
- * Merge the given accumulators according to the underlying combiner.
- */
- AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
- @Override
- CombiningState<InputT, AccumT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
deleted file mode 100644
index bd7a8d9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
- /**
- * Add a value to the buffer.
- */
- void add(InputT value);
-
- /**
- * Return true if this state is empty.
- */
- ReadableState<Boolean> isEmpty();
-
- @Override
- GroupingState<InputT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
deleted file mode 100644
index fb7e807..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.util.Map;
-
-/**
- * An object that maps keys to values.
- * A map cannot contain duplicate keys;
- * each key can map to at most one value.
- *
- * @param <K> the type of keys maintained by this map
- * @param <V> the type of mapped values
- */
-public interface MapState<K, V> extends State {
-
- /**
- * Associates the specified value with the specified key in this state.
- */
- void put(K key, V value);
-
- /**
- * A deferred read-followed-by-write.
- *
- * <p>When {@code read()} is called on the result or state is committed, it forces a read of the
- * map and reconciliation with any pending modifications.
- *
- * <p>If the specified key is not already associated with a value (or is mapped to {@code null})
- * associates it with the given value and returns {@code null}, else returns the current value.
- */
- ReadableState<V> putIfAbsent(K key, V value);
-
- /**
- * Removes the mapping for a key from this map if it is present.
- */
- void remove(K key);
-
- /**
- * A deferred lookup.
- *
- * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()}
- * on the results.
- *
- * <p>When {@code read()} is called, a particular state implementation is encouraged to perform
- * all pending reads in a single batch.
- */
- ReadableState<V> get(K key);
-
- /**
- * Returns a iterable view of the keys contained in this map.
- */
- ReadableState<Iterable<K>> keys();
-
- /**
- * Returns a iterable view of the values contained in this map.
- */
- ReadableState<Iterable<V>> values();
-
- /**
- * Returns a iterable view of all key-values.
- */
- ReadableState<Iterable<Map.Entry<K, V>>> entries();
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
deleted file mode 100644
index c3e9936..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * {@link State} that can be read via {@link #read()}.
- *
- * <p>Use {@link #readLater()} for marking several states for prefetching. Runners
- * can potentially batch these into one read.
- *
- * @param <T> The type of value returned by {@link #read}.
- */
-@Experimental(Kind.STATE)
-public interface ReadableState<T> {
- /**
- * Read the current value, blocking until it is available.
- *
- * <p>If there will be many calls to {@link #read} for different state in short succession,
- * you should first call {@link #readLater} for all of them so the reads can potentially be
- * batched (depending on the underlying implementation}.
- */
- T read();
-
- /**
- * Indicate that the value will be read later.
- *
- * <p>This allows an implementation to start an asynchronous prefetch or
- * to include this state in the next batch of reads.
- *
- * @return this for convenient chaining
- */
- ReadableState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
deleted file mode 100644
index 819eda6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Utilities for constructing and manipulating {@link ReadableState} instances.
- */
-@Experimental(Kind.STATE)
-public class ReadableStates {
-
- /**
- * A {@link ReadableState} constructed from a constant value, hence immediately available.
- */
- public static <T> ReadableState<T> immediate(final T value) {
- return new ReadableState<T>() {
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public ReadableState<T> readLater() {
- return this;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
deleted file mode 100644
index 56ea510..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * State containing no duplicate elements.
- * Items can be added to the set and the contents read out.
- *
- * @param <T> The type of elements in the set.
- */
-public interface SetState<T> extends GroupingState<T, Iterable<T>> {
- /**
- * Returns true if this set contains the specified element.
- */
- ReadableState<Boolean> contains(T t);
-
- /**
- * Ensures a value is a member of the set, returning {@code true} if it was added and {@code
- * false} otherwise.
- */
- ReadableState<Boolean> addIfAbsent(T t);
-
- /**
- * Removes the specified element from this set if it is present.
- */
- void remove(T t);
-
- @Override
- SetState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
deleted file mode 100644
index 3a49f01..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-/**
- * Base interface for all state locations.
- *
- * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
- */
-public interface State {
-
- /**
- * Clear out the state location.
- */
- void clear();
-}