You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:35 UTC
[35/53] [abbrv] beam git commit: jstorm-runner: 1. Add kryo
serializer for Collections.SingletonLists 2. Fix concurrent problem of
elementIndex of JStormBagState
jstorm-runner:
1. Add kryo serializer for Collections.SingletonLists
2. Fix concurrent problem of elementIndex of JStormBagState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/201ef722
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/201ef722
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/201ef722
Branch: refs/heads/jstorm-runner
Commit: 201ef722ec36b0ffa8197722fdf898fb9978803c
Parents: 1bf3224
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Jul 19 20:15:56 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/JStormRunner.java | 2 +
.../serialization/CollectionsSerializer.java | 43 +++++++++++++++++++
.../jstorm/translation/ExecutorsBolt.java | 2 +-
.../translation/JStormStateInternals.java | 44 +++++++++++++-------
4 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 286a975..56db1c6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -31,6 +31,7 @@ import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer;
import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer;
import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
@@ -105,6 +106,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
SdkRepackImmuSetSerializer.registerSerializers(config);
ImmutableMapSerializer.registerSerializers(config);
SdkRepackImmutableMapSerializer.registerSerializers(config);
+ CollectionsSerializer.registerSerializers(config);
config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
return config;
http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
new file mode 100644
index 0000000..0548196
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
@@ -0,0 +1,43 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Specific serializer of {@link Kryo} for Collections.
+ */
+public class CollectionsSerializer {
+
+ /**
+ * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
+ */
+ public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
+ public CollectionsSingletonListSerializer() {
+ setImmutable(true);
+ }
+
+ @Override
+ public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
+ final Object obj = kryo.readClassAndObject(input);
+ return Collections.singletonList(obj);
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final List<?> list) {
+ kryo.writeClassAndObject(output, list.get(0));
+ }
+
+ }
+
+ public static void registerSerializers(Config config) {
+ config.registerSerialization(Collections.singletonList("").getClass(),
+ CollectionsSingletonListSerializer.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index ce6ea2c..33393f2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -240,7 +240,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
}
public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
- LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+ LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
if (elem != null) {
Executor executor = inputTagToExecutor.get(inputTag);
if (executor != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 3b6b4d5..68a17e5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -261,7 +261,6 @@ class JStormStateInternals<K> implements StateInternals {
private final StateNamespace namespace;
private final IKvStore<ComposedKey, T> kvState;
private final IKvStore<ComposedKey, Object> stateInfoKvState;
- private int elemIndex;
JStormBagState(
@Nullable K key,
@@ -272,17 +271,19 @@ class JStormStateInternals<K> implements StateInternals {
this.namespace = checkNotNull(namespace, "namespace");
this.kvState = checkNotNull(kvState, "kvState");
this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+ }
- Integer index = (Integer) stateInfoKvState.get(getComposedKey());
- this.elemIndex = index != null ? ++index : 0;
+ private int getElementIndex() throws IOException {
+ Integer elementIndex = (Integer) stateInfoKvState.get(getComposedKey());
+ return elementIndex != null ? elementIndex : 0;
}
@Override
public void add(T input) {
try {
+ int elemIndex = getElementIndex();
kvState.put(getComposedKey(elemIndex), input);
- stateInfoKvState.put(getComposedKey(), elemIndex);
- elemIndex++;
+ stateInfoKvState.put(getComposedKey(), ++elemIndex);
} catch (IOException e) {
throw new RuntimeException(e.getCause());
}
@@ -293,7 +294,12 @@ class JStormStateInternals<K> implements StateInternals {
return new ReadableState<Boolean>() {
@Override
public Boolean read() {
- return elemIndex <= 0;
+ try {
+ return getElementIndex() <= 0;
+ } catch (IOException e) {
+ LOG.error("Failed to read", e);
+ return false;
+ }
}
@Override
@@ -306,7 +312,7 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public Iterable<T> read() {
- return new BagStateIterable(elemIndex);
+ return new BagStateIterable();
}
@Override
@@ -318,11 +324,11 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public void clear() {
try {
+ int elemIndex = getElementIndex();
for (int i = 0; i < elemIndex; i++) {
kvState.remove(getComposedKey(i));
}
stateInfoKvState.remove(getComposedKey());
- elemIndex = 0;
} catch (IOException e) {
throw new RuntimeException(e.getCause());
}
@@ -336,6 +342,18 @@ class JStormStateInternals<K> implements StateInternals {
return ComposedKey.of(key, namespace, elemIndex);
}
+ @Override
+ public String toString() {
+ int elemIndex = -1;
+ try {
+ elemIndex = getElementIndex();
+ } catch (IOException e) {
+
+ }
+ return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d",
+ key, namespace, elemIndex);
+ }
+
/**
* Implementation of Bag state Iterable.
*/
@@ -346,13 +364,11 @@ class JStormStateInternals<K> implements StateInternals {
private int cursor = 0;
BagStateIterator() {
- Integer s = null;
try {
- s = (Integer) stateInfoKvState.get(getComposedKey());
+ this.size = getElementIndex();
} catch (IOException e) {
- LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+ throw new RuntimeException(e.getCause());
}
- this.size = s != null ? ++s : 0;
}
@Override
@@ -382,10 +398,8 @@ class JStormStateInternals<K> implements StateInternals {
}
}
- private final int size;
+ BagStateIterable() {
- BagStateIterable(int size) {
- this.size = size;
}
@Override