You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:27 UTC
[27/53] [abbrv] beam git commit: jstorm-runner: move jstorm state
implementations to JStormStateInternals inner classes.
jstorm-runner: move jstorm state implementations to JStormStateInternals inner classes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9abbbd06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9abbbd06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9abbbd06
Branch: refs/heads/jstorm-runner
Commit: 9abbbd064e878a961ff3e8fc62d96ea650fd7570
Parents: 8cdd41b
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 16:10:29 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800
----------------------------------------------------------------------
.../jstorm/translation/JStormBagState.java | 180 -------
.../translation/JStormCombiningState.java | 88 ----
.../jstorm/translation/JStormMapState.java | 158 -------
.../translation/JStormStateInternals.java | 464 +++++++++++++++++++
.../jstorm/translation/JStormValueState.java | 82 ----
.../translation/JStormWatermarkHoldState.java | 82 ----
6 files changed, 464 insertions(+), 590 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
deleted file mode 100644
index 3e5d52b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link BagState} in JStorm runner.
- */
-class JStormBagState<K, T> implements BagState<T> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
- private final IKvStore<ComposedKey, Object> stateInfoKvState;
- private int elemIndex;
-
- public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
- IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
- this.key = key;
- this.namespace = checkNotNull(namespace, "namespace");
- this.kvState = checkNotNull(kvState, "kvState");
- this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
- Integer index = (Integer) stateInfoKvState.get(getComposedKey());
- this.elemIndex = index != null ? ++index : 0;
- }
-
- @Override
- public void add(T input) {
- try {
- kvState.put(getComposedKey(elemIndex), input);
- stateInfoKvState.put(getComposedKey(), elemIndex);
- elemIndex++;
- } catch (IOException e) {
- throw new RuntimeException(e.getCause());
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- return elemIndex <= 0;
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- // TODO: support prefetch.
- return this;
- }
- };
- }
-
- @Override
- public Iterable<T> read() {
- return new BagStateIterable(elemIndex);
- }
-
- @Override
- public BagState readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- try {
- for (int i = 0; i < elemIndex; i++) {
- kvState.remove(getComposedKey(i));
- }
- stateInfoKvState.remove(getComposedKey());
- elemIndex = 0;
- } catch (IOException e) {
- throw new RuntimeException(e.getCause());
- }
- }
-
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
-
- private ComposedKey getComposedKey(int elemIndex) {
- return ComposedKey.of(key, namespace, elemIndex);
- }
-
- /**
- * Implementation of Bag state Iterable.
- */
- private class BagStateIterable implements KvStoreIterable<T> {
-
- private class BagStateIterator implements Iterator<T> {
- private final int size;
- private int cursor = 0;
-
- BagStateIterator() {
- Integer s = null;
- try {
- s = (Integer) stateInfoKvState.get(getComposedKey());
- } catch (IOException e) {
- LOG.error("Failed to get elemIndex for key={}", getComposedKey());
- }
- this.size = s != null ? ++s : 0;
- }
-
- @Override
- public boolean hasNext() {
- return cursor < size;
- }
-
- @Override
- public T next() {
- if (cursor >= size) {
- throw new NoSuchElementException();
- }
-
- T value = null;
- try {
- value = kvState.get(getComposedKey(cursor));
- } catch (IOException e) {
- LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
- }
- cursor++;
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private final int size;
-
- BagStateIterable(int size) {
- this.size = size;
- }
-
- @Override
- public Iterator<T> iterator() {
- return new BagStateIterator();
- }
-
- @Override
- public String toString() {
- return String.format("BagStateIterable: composedKey=%s", getComposedKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
deleted file mode 100644
index 6bd021f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * JStorm implementation of {@link CombiningState}.
- */
-class JStormCombiningState<InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- @Nullable
- private final BagState<AccumT> accumBagState;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
- JStormCombiningState(
- BagState<AccumT> accumBagState,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- this.accumBagState = checkNotNull(accumBagState, "accumBagState");
- this.combineFn = checkNotNull(combineFn, "combineFn");
- }
-
- @Override
- public AccumT getAccum() {
- // TODO: replacing the accumBagState with the merged accum.
- return combineFn.mergeAccumulators(accumBagState.read());
- }
-
- @Override
- public void addAccum(AccumT accumT) {
- accumBagState.add(accumT);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
- return combineFn.mergeAccumulators(iterable);
- }
-
- @Override
- public void add(InputT input) {
- accumBagState.add(
- combineFn.addInput(combineFn.createAccumulator(), input));
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return accumBagState.isEmpty();
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(
- combineFn.mergeAccumulators(accumBagState.read()));
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- accumBagState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
deleted file mode 100644
index 6a4e376..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link MapState} in JStorm runner.
- * @param <K>
- * @param <V>
- */
-class JStormMapState<K, V> implements MapState<K, V> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
- private final K key;
- private final StateNamespace namespace;
- private IKvStore<K, V> kvStore;
-
- public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
- this.key = key;
- this.namespace = namespace;
- this.kvStore = kvStore;
- }
-
- @Override
- public void put(K var1, V var2) {
- try {
- kvStore.put(var1, var2);
- } catch (IOException e) {
- reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
- }
- }
-
- @Override
- public ReadableState<V> putIfAbsent(K var1, V var2) {
- ReadableState<V> ret = null;
- try {
- V value = kvStore.get(var1);
- if (value == null) {
- kvStore.put(var1, var2);
- ret = new MapReadableState<>(null);
- } else {
- ret = new MapReadableState<>(value);
- }
- } catch (IOException e) {
- reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
- }
- return ret;
- }
-
- @Override
- public void remove(K var1) {
- try {
- kvStore.remove(var1);
- } catch (IOException e) {
- reportError(String.format("Failed to remove key=%s", var1), e);
- }
- }
-
- @Override
- public ReadableState<V> get(K var1) {
- ReadableState<V> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState(kvStore.get(var1));
- } catch (IOException e) {
- reportError(String.format("Failed to get value for key=%s", var1), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<K>> keys() {
- ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.keys());
- } catch (IOException e) {
- reportError(String.format("Failed to get keys"), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<V>> values() {
- ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.values());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
- ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.entries());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
- }
-
- @Override
- public void clear() {
- try {
- Iterable<K> keys = kvStore.keys();
- kvStore.removeBatch(keys);
- } catch (IOException e) {
- reportError(String.format("Failed to clear map state"), e);
- }
- }
-
- private void reportError(String errorInfo, IOException e) {
- LOG.error(errorInfo, e);
- throw new RuntimeException(errorInfo);
- }
-
- private class MapReadableState<T> implements ReadableState<T> {
- private T value;
-
- public MapReadableState(T value) {
- this.value = value;
- }
-
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public ReadableState<T> readLater() {
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 78882f2..3b6b4d5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -20,8 +20,13 @@ package org.apache.beam.runners.jstorm.translation;
import static com.google.common.base.Preconditions.checkNotNull;
import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreIterable;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
@@ -29,7 +34,9 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateBinder;
@@ -42,12 +49,16 @@ import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* JStorm implementation of {@link StateInternals}.
*/
class JStormStateInternals<K> implements StateInternals {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormStateInternals.class);
+
private static final String STATE_INFO = "state-info:";
@Nullable
@@ -183,6 +194,459 @@ class JStormStateInternals<K> implements StateInternals {
});
}
+ /**
+ * JStorm implementation of {@link ValueState}.
+ */
+ private static class JStormValueState<K, T> implements ValueState<T> {
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+
+ JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvState = kvState;
+ }
+
+ @Override
+ public void write(T t) {
+ try {
+ kvState.put(getComposedKey(), t);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+ }
+ }
+
+ @Override
+ public T read() {
+ try {
+ return kvState.get(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to read key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ @Override
+ public ValueState<T> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ kvState.remove(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to clear key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+ }
+
+ /**
+ * Implementation of {@link BagState} in JStorm runner.
+ */
+ private static class JStormBagState<K, T> implements BagState<T> {
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+ private final IKvStore<ComposedKey, Object> stateInfoKvState;
+ private int elemIndex;
+
+ JStormBagState(
+ @Nullable K key,
+ StateNamespace namespace,
+ IKvStore<ComposedKey, T> kvState,
+ IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+ this.key = key;
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.kvState = checkNotNull(kvState, "kvState");
+ this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+ Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+ this.elemIndex = index != null ? ++index : 0;
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ kvState.put(getComposedKey(elemIndex), input);
+ stateInfoKvState.put(getComposedKey(), elemIndex);
+ elemIndex++;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ return elemIndex <= 0;
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public Iterable<T> read() {
+ return new BagStateIterable(elemIndex);
+ }
+
+ @Override
+ public BagState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ for (int i = 0; i < elemIndex; i++) {
+ kvState.remove(getComposedKey(i));
+ }
+ stateInfoKvState.remove(getComposedKey());
+ elemIndex = 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+
+ private ComposedKey getComposedKey(int elemIndex) {
+ return ComposedKey.of(key, namespace, elemIndex);
+ }
+
+ /**
+ * Implementation of Bag state Iterable.
+ */
+ private class BagStateIterable implements KvStoreIterable<T> {
+
+ private class BagStateIterator implements Iterator<T> {
+ private final int size;
+ private int cursor = 0;
+
+ BagStateIterator() {
+ Integer s = null;
+ try {
+ s = (Integer) stateInfoKvState.get(getComposedKey());
+ } catch (IOException e) {
+ LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+ }
+ this.size = s != null ? ++s : 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cursor < size;
+ }
+
+ @Override
+ public T next() {
+ if (cursor >= size) {
+ throw new NoSuchElementException();
+ }
+
+ T value = null;
+ try {
+ value = kvState.get(getComposedKey(cursor));
+ } catch (IOException e) {
+ LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+ }
+ cursor++;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private final int size;
+
+ BagStateIterable(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new BagStateIterator();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+ }
+ }
+ }
+
+ /**
+ * JStorm implementation of {@link CombiningState}.
+ */
+ private static class JStormCombiningState<InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ @Nullable
+ private final BagState<AccumT> accumBagState;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ JStormCombiningState(
+ BagState<AccumT> accumBagState,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+ this.combineFn = checkNotNull(combineFn, "combineFn");
+ }
+
+ @Override
+ public AccumT getAccum() {
+ // TODO: replacing the accumBagState with the merged accum.
+ return combineFn.mergeAccumulators(accumBagState.read());
+ }
+
+ @Override
+ public void addAccum(AccumT accumT) {
+ accumBagState.add(accumT);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+ return combineFn.mergeAccumulators(iterable);
+ }
+
+ @Override
+ public void add(InputT input) {
+ accumBagState.add(
+ combineFn.addInput(combineFn.createAccumulator(), input));
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return accumBagState.isEmpty();
+ }
+
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(
+ combineFn.mergeAccumulators(accumBagState.read()));
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ accumBagState.clear();
+ }
+ }
+
+ /**
+ * Implementation of {@link MapState} in JStorm runner.
+ * @param <K>
+ * @param <V>
+ */
+ private static class JStormMapState<K, V> implements MapState<K, V> {
+
+ private final K key;
+ private final StateNamespace namespace;
+ private IKvStore<K, V> kvStore;
+
+ JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public void put(K var1, V var2) {
+ try {
+ kvStore.put(var1, var2);
+ } catch (IOException e) {
+ reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> putIfAbsent(K var1, V var2) {
+ ReadableState<V> ret = null;
+ try {
+ V value = kvStore.get(var1);
+ if (value == null) {
+ kvStore.put(var1, var2);
+ ret = new MapReadableState<>(null);
+ } else {
+ ret = new MapReadableState<>(value);
+ }
+ } catch (IOException e) {
+ reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove(K var1) {
+ try {
+ kvStore.remove(var1);
+ } catch (IOException e) {
+ reportError(String.format("Failed to remove key=%s", var1), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> get(K var1) {
+ ReadableState<V> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState(kvStore.get(var1));
+ } catch (IOException e) {
+ reportError(String.format("Failed to get value for key=%s", var1), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<K>> keys() {
+ ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.keys());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get keys"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<V>> values() {
+ ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.values());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+ ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.entries());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ Iterable<K> keys = kvStore.keys();
+ kvStore.removeBatch(keys);
+ } catch (IOException e) {
+ reportError(String.format("Failed to clear map state"), e);
+ }
+ }
+
+ private void reportError(String errorInfo, IOException e) {
+ LOG.error(errorInfo, e);
+ throw new RuntimeException(errorInfo);
+ }
+
+ private class MapReadableState<T> implements ReadableState<T> {
+ private T value;
+
+ public MapReadableState(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
+ }
+ }
+ }
+
+ /**
+ * JStorm implementation of {@link WatermarkHoldState}.
+ */
+ private static class JStormWatermarkHoldState implements WatermarkHoldState {
+
+ private final StateNamespace namespace;
+ private final GroupingState<Instant, Instant> watermarkHoldsState;
+ private final TimestampCombiner timestampCombiner;
+ private final TimerService timerService;
+
+ JStormWatermarkHoldState(
+ StateNamespace namespace,
+ GroupingState<Instant, Instant> watermarkHoldsState,
+ TimestampCombiner timestampCombiner,
+ TimerService timerService) {
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+ this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
+ }
+
+ @Override
+ public void add(Instant instant) {
+ timerService.addWatermarkHold(namespace.stringKey(), instant);
+ watermarkHoldsState.add(instant);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return watermarkHoldsState.isEmpty();
+ }
+
+ @Override
+ public Instant read() {
+ return watermarkHoldsState.read();
+ }
+
+ @Override
+ public WatermarkHoldState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ timerService.clearWatermarkHold(namespace.stringKey());
+ watermarkHoldsState.clear();
+ }
+ }
+
private String getStoreId(String stateId) {
return String.format("%s-%s", stateId, executorId);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
deleted file mode 100644
index 5d79d21..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.ValueState;
-
-/**
- * JStorm implementation of {@link ValueState}.
- */
-class JStormValueState<K, T> implements ValueState<T> {
-
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
-
- JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
- this.key = key;
- this.namespace = namespace;
- this.kvState = kvState;
- }
-
- @Override
- public void write(T t) {
- try {
- kvState.put(getComposedKey(), t);
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
- }
- }
-
- @Override
- public T read() {
- try {
- return kvState.get(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to read key: %s, namespace: %s.", key, namespace));
- }
- }
-
- @Override
- public ValueState<T> readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- try {
- kvState.remove(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to clear key: %s, namespace: %s.", key, namespace));
- }
- }
-
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
deleted file mode 100644
index 7e1c28f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link WatermarkHoldState}.
- */
-class JStormWatermarkHoldState implements WatermarkHoldState {
-
- private final StateNamespace namespace;
- private final GroupingState<Instant, Instant> watermarkHoldsState;
- private final TimestampCombiner timestampCombiner;
- private final TimerService timerService;
-
- JStormWatermarkHoldState(
- StateNamespace namespace,
- GroupingState<Instant, Instant> watermarkHoldsState,
- TimestampCombiner timestampCombiner,
- TimerService timerService) {
- this.namespace = checkNotNull(namespace, "namespace");
- this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
- this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
- this.timerService = checkNotNull(timerService, "timerService");
- }
-
- @Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
- }
-
- @Override
- public void add(Instant instant) {
- timerService.addWatermarkHold(namespace.stringKey(), instant);
- watermarkHoldsState.add(instant);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return watermarkHoldsState.isEmpty();
- }
-
- @Override
- public Instant read() {
- return watermarkHoldsState.read();
- }
-
- @Override
- public WatermarkHoldState readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- timerService.clearWatermarkHold(namespace.stringKey());
- watermarkHoldsState.clear();
- }
-}