You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:36 UTC
[36/53] [abbrv] beam git commit: jstorm-runner: Fix the failure of
session window test cases in CombineTest
jstorm-runner: Fix the failure of session window test cases in CombineTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52913b7e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52913b7e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52913b7e
Branch: refs/heads/jstorm-runner
Commit: 52913b7e2b01b4e6c65d96a10d745dd3e6739c83
Parents: 201ef72
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Jul 20 14:37:29 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800
----------------------------------------------------------------------
.../jstorm/translation/FlattenTranslator.java | 1 -
.../translation/JStormStateInternals.java | 188 +++++++++++++++++--
2 files changed, 176 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index 8f239bf..e104ad8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 68a17e5..90ef6d2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
@@ -93,13 +94,14 @@ class JStormStateInternals<K> implements StateInternals {
}
@Override
- public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+ public <T extends State> T state(final StateNamespace namespace, final StateTag<T> address) {
return address.getSpec().bind(address.getId(), new StateBinder() {
@Override
public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
try {
return new JStormValueState<>(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+ getStoreId(id), spec, getKey(), namespace,
+ kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
} catch (IOException e) {
throw new RuntimeException();
}
@@ -109,7 +111,8 @@ class JStormStateInternals<K> implements StateInternals {
public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
try {
return new JStormBagState(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+ getStoreId(id), spec, getKey(), namespace,
+ kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
} catch (IOException e) {
throw new RuntimeException();
@@ -129,7 +132,8 @@ class JStormStateInternals<K> implements StateInternals {
Coder<ValueT> mapValueCoder) {
try {
return new JStormMapState<>(
- getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+ getStoreId(id), spec, (KeyT) getKey(), namespace,
+ kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -143,10 +147,11 @@ class JStormStateInternals<K> implements StateInternals {
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
BagState<AccumT> accumBagState = new JStormBagState(
- getKey(), namespace,
+ getStoreId(id), StateSpecs.<InputT>bag(), getKey(), namespace,
kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
- return new JStormCombiningState<>(accumBagState, combineFn);
+ return new JStormCombiningState<>(
+ id, spec, namespace, accumBagState, combineFn);
} catch (IOException e) {
throw new RuntimeException();
}
@@ -169,7 +174,7 @@ class JStormStateInternals<K> implements StateInternals {
final TimestampCombiner timestampCombiner) {
try {
BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
- getKey(), namespace,
+ getStoreId(id), StateSpecs.<Combine.Holder<Instant>>bag(), getKey(), namespace,
kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
@@ -181,8 +186,11 @@ class JStormStateInternals<K> implements StateInternals {
}
};
return new JStormWatermarkHoldState(
- namespace,
+ id, spec, namespace,
new JStormCombiningState<>(
+ getStoreId(id),
+ StateSpecs.combining(outputTimeCombineFn),
+ namespace,
accumBagState,
outputTimeCombineFn),
timestampCombiner,
@@ -199,12 +207,21 @@ class JStormStateInternals<K> implements StateInternals {
*/
private static class JStormValueState<K, T> implements ValueState<T> {
+ private final String id;
+ private final StateSpec<ValueState<T>> spec;
@Nullable
private final K key;
private final StateNamespace namespace;
private final IKvStore<ComposedKey, T> kvState;
- JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+ JStormValueState(
+ String id,
+ StateSpec<ValueState<T>> spec,
+ @Nullable K key,
+ StateNamespace namespace,
+ IKvStore<ComposedKey, T> kvState) {
+ this.id = id;
+ this.spec = spec;
this.key = key;
this.namespace = namespace;
this.kvState = kvState;
@@ -246,6 +263,29 @@ class JStormStateInternals<K> implements StateInternals {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JStormValueState<?, ?> that = (JStormValueState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + id.hashCode();
+ result = 31 * result + spec.hashCode();
+ return result;
+ }
+
private ComposedKey getComposedKey() {
return ComposedKey.of(key, namespace);
}
@@ -256,6 +296,8 @@ class JStormStateInternals<K> implements StateInternals {
*/
private static class JStormBagState<K, T> implements BagState<T> {
+ private final String id;
+ private final StateSpec<BagState<T>> spec;
@Nullable
private final K key;
private final StateNamespace namespace;
@@ -263,10 +305,14 @@ class JStormStateInternals<K> implements StateInternals {
private final IKvStore<ComposedKey, Object> stateInfoKvState;
JStormBagState(
+ String id,
+ StateSpec<BagState<T>> spec,
@Nullable K key,
StateNamespace namespace,
IKvStore<ComposedKey, T> kvState,
IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+ this.id = id;
+ this.spec = spec;
this.key = key;
this.namespace = checkNotNull(namespace, "namespace");
this.kvState = checkNotNull(kvState, "kvState");
@@ -350,8 +396,31 @@ class JStormStateInternals<K> implements StateInternals {
} catch (IOException e) {
}
- return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d",
- key, namespace, elemIndex);
+ return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d",
+ id, key, namespace, elemIndex);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JStormBagState<?, ?> that = (JStormBagState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + id.hashCode();
+ result = 31 * result + spec.hashCode();
+ return result;
}
/**
@@ -420,13 +489,22 @@ class JStormStateInternals<K> implements StateInternals {
private static class JStormCombiningState<InputT, AccumT, OutputT>
implements CombiningState<InputT, AccumT, OutputT> {
+ private final String id;
+ private final StateSpec<CombiningState<InputT, AccumT, OutputT>> spec;
+ private final StateNamespace namespace;
@Nullable
private final BagState<AccumT> accumBagState;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
JStormCombiningState(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ StateNamespace namespace,
BagState<AccumT> accumBagState,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.id = id;
+ this.spec = spec;
+ this.namespace = namespace;
this.accumBagState = checkNotNull(accumBagState, "accumBagState");
this.combineFn = checkNotNull(combineFn, "combineFn");
}
@@ -474,6 +552,29 @@ class JStormStateInternals<K> implements StateInternals {
public void clear() {
accumBagState.clear();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JStormCombiningState<?, ?, ?> that = (JStormCombiningState<?, ?, ?>) o;
+
+ return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + id.hashCode();
+ result = 31 * result + spec.hashCode();
+ return result;
+ }
}
/**
@@ -483,11 +584,19 @@ class JStormStateInternals<K> implements StateInternals {
*/
private static class JStormMapState<K, V> implements MapState<K, V> {
+ private final String id;
+ private final StateSpec<MapState<K, V>> spec;
private final K key;
private final StateNamespace namespace;
private IKvStore<K, V> kvStore;
- JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+ JStormMapState(
+ String id,
+ StateSpec<MapState<K, V>> spec,
+ K key,
+ StateNamespace namespace, IKvStore<K, V> kvStore) {
+ this.id = id;
+ this.spec = spec;
this.key = key;
this.namespace = namespace;
this.kvStore = kvStore;
@@ -582,6 +691,29 @@ class JStormStateInternals<K> implements StateInternals {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JStormMapState<?, ?> that = (JStormMapState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + id.hashCode();
+ result = 31 * result + spec.hashCode();
+ return result;
+ }
+
private void reportError(String errorInfo, IOException e) {
LOG.error(errorInfo, e);
throw new RuntimeException(errorInfo);
@@ -611,16 +743,22 @@ class JStormStateInternals<K> implements StateInternals {
*/
private static class JStormWatermarkHoldState implements WatermarkHoldState {
+ private final String id;
+ private final StateSpec<WatermarkHoldState> spec;
private final StateNamespace namespace;
private final GroupingState<Instant, Instant> watermarkHoldsState;
private final TimestampCombiner timestampCombiner;
private final TimerService timerService;
JStormWatermarkHoldState(
+ String id,
+ StateSpec<WatermarkHoldState> spec,
StateNamespace namespace,
GroupingState<Instant, Instant> watermarkHoldsState,
TimestampCombiner timestampCombiner,
TimerService timerService) {
+ this.id = checkNotNull(id, "id");
+ this.spec = checkNotNull(spec, "spec");
this.namespace = checkNotNull(namespace, "namespace");
this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
@@ -659,6 +797,32 @@ class JStormStateInternals<K> implements StateInternals {
timerService.clearWatermarkHold(namespace.stringKey());
watermarkHoldsState.clear();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JStormWatermarkHoldState that = (JStormWatermarkHoldState) o;
+
+ return namespace.equals(that.namespace)
+ && id.equals(that.id)
+ && spec.equals(that.spec)
+ && timestampCombiner.equals(that.timestampCombiner);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + id.hashCode();
+ result = 31 * result + spec.hashCode();
+ result = 31 * result + timestampCombiner.hashCode();
+ return result;
+ }
}
private String getStoreId(String stateId) {