You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/12 17:18:48 UTC
[1/2] incubator-beam git commit: Refactor StateSpec out of StateTag
Repository: incubator-beam
Updated Branches:
refs/heads/master 142229e37 -> bc9ed7dbd
Refactor StateSpec out of StateTag
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c1ba2e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c1ba2e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c1ba2e1
Branch: refs/heads/master
Commit: 7c1ba2e1062556ac98b29f5bb4f5b75a7e7832e2
Parents: 135790b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 20:50:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 11 20:27:12 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 67 +++
.../apache/beam/sdk/util/state/StateSpec.java | 39 ++
.../apache/beam/sdk/util/state/StateSpecs.java | 452 +++++++++++++++++++
.../apache/beam/sdk/util/state/StateTag.java | 82 ++--
.../apache/beam/sdk/util/state/StateTags.java | 386 ++--------------
6 files changed, 655 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 3c01690..c9223a7 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -116,7 +116,7 @@
<!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.state.StateTags$CombiningValueStateTag"/>
+ <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
<Method name="equals"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
<!--[BEAM-421] Class doesn't override equals in superclass-->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
new file mode 100644
index 0000000..0521e15
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+
+/**
+ * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
+ *
+ * @param <K> the type of key this binder embodies.
+ */
+public interface StateBinder<K> {
+ <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+
+ <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+
+ <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
+
+ <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
+
+ <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
+ combineFn);
+
+ /**
+ * Bind to a watermark {@link StateSpec}.
+ *
+ * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+ * the returned {@link WatermarkHoldState} are to be combined.
+ */
+ <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ String id,
+ StateSpec<? super K, WatermarkHoldState<W>> spec,
+ OutputTimeFn<? super W> outputTimeFn);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
new file mode 100644
index 0000000..4fdeefb
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A specification of a persistent state cell. This includes information necessary to encode the
+ * value and details about the intended access pattern.
+ *
+ * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
+ * accept values of type {@code StateSpec<? super K, StateT>}.
+ * @param <StateT> The type of state being described.
+ */
+@Experimental(Kind.STATE)
+public interface StateSpec<K, StateT extends State> extends Serializable {
+
+ /**
+ * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
+ */
+ StateT bind(String id, StateBinder<? extends K> binder);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
new file mode 100644
index 0000000..db0eec6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+
+/**
+ * Static utility methods for creating {@link StateSpec} instances.
+ */
+@Experimental(Kind.STATE)
+public class StateSpecs {
+
+ private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
+
+ static {
+ STANDARD_REGISTRY.registerStandardCoders();
+ }
+
+ private StateSpecs() {}
+
+ /** Create a simple state spec for values of type {@code T}. */
+ public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+ return new ValueStateSpec<>(valueCoder);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+ * {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+ Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return combiningValueInternal(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
+ * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link
+ * KeyedCombineFn} comes from the keyed {@link StateAccessor}.
+ */
+ public static <K, InputT, AccumT, OutputT>
+ StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+ return keyedCombiningValueInternal(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
+ * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link
+ * KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes from the
+ * {@link StateContext}.
+ */
+ public static <K, InputT, AccumT, OutputT>
+ StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ keyedCombiningValueWithContext(
+ Coder<AccumT> accumCoder,
+ KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+ accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+ * {@code InputT}s into a single {@code OutputT}.
+ *
+ * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
+ * only be used to initialize static values.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ combiningValueFromInputInternal(
+ Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ try {
+ Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
+ return combiningValueInternal(accumCoder, combineFn);
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(
+ "Unable to determine accumulator coder for "
+ + combineFn.getClass().getSimpleName()
+ + " from "
+ + inputCoder,
+ e);
+ }
+ }
+
+ private static <InputT, AccumT, OutputT>
+ StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+ Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+ }
+
+ private static <K, InputT, AccumT, OutputT>
+ StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+ Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+ return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+ }
+
+ /**
+ * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+ * all the values that have been added.
+ */
+ public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+ return new BagStateSpec<T>(elemCoder);
+ }
+
+ /** Create a state spec for holding the watermark. */
+ public static <W extends BoundedWindow>
+ StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
+ OutputTimeFn<? super W> outputTimeFn) {
+ return new WatermarkStateSpecInternal<W>(outputTimeFn);
+ }
+
+ public static <K, InputT, AccumT, OutputT>
+ StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+ // Checked above; conversion to a bag spec depends on the provided spec being one of those
+ // created via the factory methods in this class.
+ @SuppressWarnings("unchecked")
+ KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ return typedSpec.asBagSpec();
+ } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+ @SuppressWarnings("unchecked")
+ KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ return typedSpec.asBagSpec();
+ } else {
+ throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
+ }
+ }
+
+ /**
+ * A specification for a state cell holding a settable value of type {@code T}.
+ *
+ * <p>Includes the coder for {@code T}.
+ */
+ private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+
+ private final Coder<T> coder;
+
+ private ValueStateSpec(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public ValueState<T> bind(String id, StateBinder<?> visitor) {
+ return visitor.bindValue(id, this, coder);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof ValueStateSpec)) {
+ return false;
+ }
+
+ ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
+ return Objects.equals(this.coder, that.coder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), coder);
+ }
+ }
+
+ /**
+ * A specification for a state cell that is combined according to a {@link CombineFn}.
+ *
+ * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
+ */
+ private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
+ extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+ implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+ private final Coder<AccumT> accumCoder;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ private CombiningValueStateSpec(
+ Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ super(accumCoder, combineFn.asKeyedFn());
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ }
+ }
+
+ /**
+ * A specification for a state cell that is combined according to a
+ * {@link KeyedCombineFnWithContext}.
+ *
+ * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
+ */
+ private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+ implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+ private final Coder<AccumT> accumCoder;
+ private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+
+ protected KeyedCombiningValueWithContextStateSpec(
+ Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ String id, StateBinder<? extends K> visitor) {
+ return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+ return false;
+ }
+
+ KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+ return Objects.equals(this.accumCoder, that.accumCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), accumCoder);
+ }
+
+ private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ return new BagStateSpec<AccumT>(accumCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell that is combined according to a {@link KeyedCombineFn}.
+ *
+ * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
+ */
+ private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+ implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+ private final Coder<AccumT> accumCoder;
+ private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+ protected KeyedCombiningValueStateSpec(
+ Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+ this.keyedCombineFn = keyedCombineFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ String id, StateBinder<? extends K> visitor) {
+ return visitor.bindKeyedCombiningValue(id, this, accumCoder, keyedCombineFn);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof CombiningValueStateSpec)) {
+ return false;
+ }
+
+ KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+ return Objects.equals(this.accumCoder, that.accumCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), accumCoder);
+ }
+
+ private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ return new BagStateSpec<AccumT>(accumCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell supporting for bag-like access patterns
+ * (frequent additions, occasional reads of all the values).
+ *
+ * <p>Includes the coder for the element type {@code T}</p>
+ */
+ private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+
+ private final Coder<T> elemCoder;
+
+ private BagStateSpec(Coder<T> elemCoder) {
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public BagState<T> bind(String id, StateBinder<?> visitor) {
+ return visitor.bindBag(id, this, elemCoder);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof BagStateSpec)) {
+ return false;
+ }
+
+ BagStateSpec<?> that = (BagStateSpec<?>) obj;
+ return Objects.equals(this.elemCoder, that.elemCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), elemCoder);
+ }
+ }
+
+ /**
+ * A specification for a state cell tracking a combined watermark hold.
+ *
+ * <p>Includes the {@link OutputTimeFn} according to which the output times
+ * are combined.
+ */
+ private static class WatermarkStateSpecInternal<W extends BoundedWindow>
+ implements StateSpec<Object, WatermarkHoldState<W>> {
+
+ /**
+ * When multiple output times are added to hold the watermark, this determines how they are
+ * combined, and also the behavior when merging windows. Does not contribute to equality/hash
+ * since we have at most one watermark hold spec per computation.
+ */
+ private final OutputTimeFn<? super W> outputTimeFn;
+
+ private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ @Override
+ public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
+ return visitor.bindWatermark(id, this, outputTimeFn);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ // All instance of WatermarkHoldState are considered equal
+ return obj instanceof WatermarkStateSpecInternal;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+ }
+
+ /**
+ * @deprecated for migration purposes only
+ */
+ @Deprecated
+ public static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) {
+ return new StateBinder<K>() {
+ @Override
+ public <T> ValueState<T> bindValue(
+ String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) {
+ return binder.bindValue(StateTags.tagForSpec(id, spec), coder);
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(
+ String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) {
+ return binder.bindBag(StateTags.tagForSpec(id, spec), elemCoder);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return binder.bindCombiningValue(StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ return binder.bindKeyedCombiningValue(
+ StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ String id,
+ StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+ return binder.bindKeyedCombiningValueWithContext(
+ StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+ }
+
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ String id,
+ StateSpec<? super K, WatermarkHoldState<W>> spec,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return binder.bindWatermark(StateTags.tagForSpec(id, spec), outputTimeFn);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
index 94cba2f..feca927 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
@@ -30,8 +30,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
/**
- * An address for persistent state. This includes a unique identifier for the location, the
- * information necessary to encode the value, and details about the intended access pattern.
+ * An address and specification for a persistent state cell. This includes a unique identifier for
+ * the location, the information necessary to encode the value, and details about the intended
+ * access pattern.
*
* <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column
* that has cells of type {@code StateT}.
@@ -45,53 +46,66 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
@Experimental(Kind.STATE)
public interface StateTag<K, StateT extends State> extends Serializable {
+ /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
+ void appendTo(Appendable sb) throws IOException;
+
+ /**
+ * An identifier for the state cell that this tag references.
+ */
+ String getId();
+
+ /**
+ * The specification for the state stored in the referenced cell.
+ */
+ StateSpec<K, StateT> getSpec();
+
+ /**
+ * Bind this state tag. See {@link StateSpec#bind}.
+ *
+ * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now.
+ */
+ @Deprecated
+ StateT bind(StateBinder<? extends K> binder);
+
/**
- * Visitor for binding a {@link StateTag} and to the associated {@link State}.
+ * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
*
* @param <K> the type of key this binder embodies.
+ * @deprecated for migration only; runners should reference the top level {@link StateBinder}
+ * and move towards {@link StateSpec} rather than {@link StateTag}.
*/
+ @Deprecated
public interface StateBinder<K> {
- <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder);
+ <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder);
- <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder);
+ <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn);
+ <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineFn<InputT, AccumT, OutputT> combineFn);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
+ <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ <InputT, AccumT, OutputT>
+ AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn);
+ KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
+ combineFn);
/**
- * Bind to a watermark {@link StateTag}.
+ * Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps
- * added to the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+ * the returned {@link WatermarkHoldState} are to be combined.
*/
<W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
+ StateTag<? super K, WatermarkHoldState<W>> spec,
OutputTimeFn<? super W> outputTimeFn);
}
-
- /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
- void appendTo(Appendable sb) throws IOException;
-
- /**
- * Returns the user-provided name of this state cell.
- */
- String getId();
-
- /**
- * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
- */
- StateT bind(StateBinder<? extends K> binder);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
index b0797b6..3c12848 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -61,11 +60,17 @@ public class StateTags {
StateTag<K, StateT> asKind(StateKind kind);
}
+ /** Create a state tag for the given id and spec. */
+ public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
+ String id, StateSpec<K, StateT> spec) {
+ return new SimpleStateTag<>(new StructuredId(id), spec);
+ }
+
/**
* Create a simple state tag for values of type {@code T}.
*/
public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
- return new ValueStateTag<>(new StructuredId(id), valueCoder);
+ return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));
}
/**
@@ -76,7 +81,8 @@ public class StateTags {
StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return combiningValueInternal(id, accumCoder, combineFn);
+ return new SimpleStateTag<>(
+ new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
}
/**
@@ -88,7 +94,8 @@ public class StateTags {
OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
keyedCombiningValue(String id, Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return keyedCombiningValueInternal(id, accumCoder, combineFn);
+ return new SimpleStateTag<>(
+ new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
}
/**
@@ -103,10 +110,8 @@ public class StateTags {
String id,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>(
- new StructuredId(id),
- accumCoder,
- combineFn);
+ return new SimpleStateTag<>(
+ new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
}
/**
@@ -120,32 +125,8 @@ public class StateTags {
StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- try {
- Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
- return combiningValueInternal(id, accumCoder, combineFn);
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName()
- + " from " + inputCoder, e);
- }
- }
-
- private static <InputT, AccumT,
- OutputT> StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- combiningValueInternal(
- String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return
- new CombiningValueStateTag<InputT, AccumT, OutputT>(
- new StructuredId(id), accumCoder, combineFn);
- }
-
- private static <K, InputT, AccumT, OutputT>
- StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
- String id,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>(
- new StructuredId(id), accumCoder, combineFn);
+ return new SimpleStateTag<>(
+ new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
}
/**
@@ -153,7 +134,7 @@ public class StateTags {
* occasionally retrieving all the values that have been added.
*/
public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
- return new BagStateTag<T>(new StructuredId(id), elemCoder);
+ return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));
}
/**
@@ -161,7 +142,8 @@ public class StateTags {
*/
public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
- return new WatermarkStateTagInternal<W>(new StructuredId(id), outputTimeFn);
+ return new SimpleStateTag<>(
+ new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
}
/**
@@ -171,7 +153,7 @@ public class StateTags {
public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
StateTag<K, StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
- throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag);
+ throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);
}
// Checked above
@SuppressWarnings("unchecked")
@@ -182,21 +164,9 @@ public class StateTags {
public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
convertToBagTagInternal(
StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
- if (combiningTag instanceof KeyedCombiningValueStateTag) {
- // Checked above; conversion to a bag tag depends on the provided tag being one of those
- // created via the factory methods in this class.
- @SuppressWarnings("unchecked")
- KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> typedTag =
- (KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>) combiningTag;
- return typedTag.asBagTag();
- } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) {
- @SuppressWarnings("unchecked")
- KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> typedTag =
- (KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>) combiningTag;
- return typedTag.asBagTag();
- } else {
- throw new IllegalArgumentException("Unexpected StateTag " + combiningTag);
- }
+ return new SimpleStateTag<>(
+ new StructuredId(combiningTag.getId()),
+ StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
}
private static class StructuredId implements Serializable {
@@ -254,15 +224,24 @@ public class StateTags {
}
/**
- * A base class that just manages the structured ids.
+ * A basic {@link StateTag} implementation that manages the structured ids.
*/
- private abstract static class StateTagBase<K, StateT extends State>
+ private static class SimpleStateTag<K, StateT extends State>
implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
- protected final StructuredId id;
+ private final StateSpec<K, StateT> spec;
+ private final StructuredId id;
- protected StateTagBase(StructuredId id) {
+ public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ @Deprecated
+ public StateT bind(StateTag.StateBinder<? extends K> binder) {
+ return spec.bind(
+ this.id.getRawId(), StateSpecs.adaptTagBinder(binder));
}
@Override
@@ -271,6 +250,11 @@ public class StateTags {
}
@Override
+ public StateSpec<K, StateT> getSpec() {
+ return spec;
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", id)
@@ -283,298 +267,24 @@ public class StateTags {
}
@Override
- public abstract StateTag<K, StateT> asKind(StateKind kind);
- }
-
- /**
- * A value state cell for values of type {@code T}.
- *
- * @param <T> the type of value being stored
- */
- private static class ValueStateTag<T> extends StateTagBase<Object, ValueState<T>>
- implements StateTag<Object, ValueState<T>> {
-
- private final Coder<T> coder;
-
- private ValueStateTag(StructuredId id, Coder<T> coder) {
- super(id);
- this.coder = coder;
- }
-
- @Override
- public ValueState<T> bind(StateBinder<?> visitor) {
- return visitor.bindValue(this, coder);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof ValueStateTag)) {
- return false;
- }
-
- ValueStateTag<?> that = (ValueStateTag<?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.coder, that.coder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, coder);
- }
-
- @Override
- public StateTag<Object, ValueState<T>> asKind(StateKind kind) {
- return new ValueStateTag<T>(id.asKind(kind), coder);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link CombineFn}.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class CombiningValueStateTag<InputT, AccumT, OutputT>
- extends KeyedCombiningValueStateTag<Object, InputT, AccumT, OutputT>
- implements StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>,
- SystemStateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final CombineFn<InputT, AccumT, OutputT> combineFn;
-
- private CombiningValueStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- super(id, accumCoder, combineFn.asKeyedFn());
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- asKind(StateKind kind) {
- return new CombiningValueStateTag<InputT, AccumT, OutputT>(
- id.asKind(kind), accumCoder, combineFn);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}.
- *
- * @param <K> the type of keys
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>
- extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
- protected KeyedCombiningValueWithContextStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- super(id);
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
- StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) {
- return false;
- }
-
- KeyedCombiningValueWithContextStateTag<?, ?, ?, ?> that =
- (KeyedCombiningValueWithContextStateTag<?, ?, ?, ?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.accumCoder, that.accumCoder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, accumCoder);
- }
-
- @Override
- public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
- StateKind kind) {
- return new KeyedCombiningValueWithContextStateTag<>(
- id.asKind(kind), accumCoder, combineFn);
- }
-
- private StateTag<Object, BagState<AccumT>> asBagTag() {
- return new BagStateTag<AccumT>(id, accumCoder);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link KeyedCombineFn}.
- *
- * @param <K> the type of keys
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>
- extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- protected KeyedCombiningValueStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- super(id);
- this.keyedCombineFn = keyedCombineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
- StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn);
+ public StateTag<K, StateT> asKind(StateKind kind) {
+ return new SimpleStateTag<>(id.asKind(kind), spec);
}
@Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof CombiningValueStateTag)) {
+ public boolean equals(Object other) {
+ if (!(other instanceof SimpleStateTag)) {
return false;
}
- KeyedCombiningValueStateTag<?, ?, ?, ?> that = (KeyedCombiningValueStateTag<?, ?, ?, ?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.accumCoder, that.accumCoder);
+ SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+ return Objects.equals(this.getId(), otherTag.getId())
+ && Objects.equals(this.getSpec(), otherTag.getSpec());
}
@Override
public int hashCode() {
- return Objects.hash(getClass(), id, accumCoder);
- }
-
- @Override
- public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
- StateKind kind) {
- return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn);
- }
-
- private StateTag<Object, BagState<AccumT>> asBagTag() {
- return new BagStateTag<AccumT>(id, accumCoder);
- }
- }
-
- /**
- * A state cell optimized for bag-like access patterns (frequent additions, occasional reads
- * of all the values).
- *
- * @param <T> the type of value in the bag
- */
- private static class BagStateTag<T> extends StateTagBase<Object, BagState<T>>
- implements StateTag<Object, BagState<T>>{
-
- private final Coder<T> elemCoder;
-
- private BagStateTag(StructuredId id, Coder<T> elemCoder) {
- super(id);
- this.elemCoder = elemCoder;
- }
-
- @Override
- public BagState<T> bind(StateBinder<?> visitor) {
- return visitor.bindBag(this, elemCoder);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof BagStateTag)) {
- return false;
- }
-
- BagStateTag<?> that = (BagStateTag<?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.elemCoder, that.elemCoder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, elemCoder);
- }
-
- @Override
- public StateTag<Object, BagState<T>> asKind(StateKind kind) {
- return new BagStateTag<>(id.asKind(kind), elemCoder);
- }
- }
-
- private static class WatermarkStateTagInternal<W extends BoundedWindow>
- extends StateTagBase<Object, WatermarkHoldState<W>> {
-
- /**
- * When multiple output times are added to hold the watermark, this determines how they are
- * combined, and also the behavior when merging windows. Does not contribute to equality/hash
- * since we have at most one watermark hold tag per computation.
- */
- private final OutputTimeFn<? super W> outputTimeFn;
-
- private WatermarkStateTagInternal(StructuredId id, OutputTimeFn<? super W> outputTimeFn) {
- super(id);
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public WatermarkHoldState<W> bind(StateBinder<?> visitor) {
- return visitor.bindWatermark(this, outputTimeFn);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof WatermarkStateTagInternal)) {
- return false;
- }
-
- WatermarkStateTagInternal<?> that = (WatermarkStateTagInternal<?>) obj;
- return Objects.equals(this.id, that.id);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id);
- }
-
- @Override
- public StateTag<Object, WatermarkHoldState<W>> asKind(StateKind kind) {
- return new WatermarkStateTagInternal<W>(id.asKind(kind), outputTimeFn);
+ return Objects.hash(getClass(), this.getId(), this.getSpec());
}
}
}
[2/2] incubator-beam git commit: This closes #1044
Posted by ke...@apache.org.
This closes #1044
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc9ed7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc9ed7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc9ed7db
Branch: refs/heads/master
Commit: bc9ed7dbd9da9f7addc365ad511b106cfcc69b01
Parents: 142229e 7c1ba2e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 12 10:17:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 12 10:17:28 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 67 +++
.../apache/beam/sdk/util/state/StateSpec.java | 39 ++
.../apache/beam/sdk/util/state/StateSpecs.java | 452 +++++++++++++++++++
.../apache/beam/sdk/util/state/StateTag.java | 82 ++--
.../apache/beam/sdk/util/state/StateTags.java | 386 ++--------------
6 files changed, 655 insertions(+), 373 deletions(-)
----------------------------------------------------------------------