You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:35 UTC

[35/53] [abbrv] beam git commit: jstorm-runner: 1. Add kryo serializer for Collections.SingletonLists 2. Fix concurrent problem of elementIndex of JStormBagState

jstorm-runner:
1. Add kryo serializer for Collections.SingletonLists
2. Fix concurrent problem of elementIndex of JStormBagState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/201ef722
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/201ef722
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/201ef722

Branch: refs/heads/jstorm-runner
Commit: 201ef722ec36b0ffa8197722fdf898fb9978803c
Parents: 1bf3224
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Jul 19 20:15:56 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunner.java       |  2 +
 .../serialization/CollectionsSerializer.java    | 43 +++++++++++++++++++
 .../jstorm/translation/ExecutorsBolt.java       |  2 +-
 .../translation/JStormStateInternals.java       | 44 +++++++++++++-------
 4 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 286a975..56db1c6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -31,6 +31,7 @@ import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer;
 import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
 import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer;
 import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
@@ -105,6 +106,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
     SdkRepackImmuSetSerializer.registerSerializers(config);
     ImmutableMapSerializer.registerSerializers(config);
     SdkRepackImmutableMapSerializer.registerSerializers(config);
+    CollectionsSerializer.registerSerializers(config);
 
     config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
     return config;

http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
new file mode 100644
index 0000000..0548196
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
@@ -0,0 +1,43 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Specific serializer of {@link Kryo} for Collections.
+ */
+public class CollectionsSerializer {
+
+  /**
+   * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
+   */
+  public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
+    public CollectionsSingletonListSerializer() {
+      setImmutable(true);
+    }
+
+    @Override
+    public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
+      final Object obj = kryo.readClassAndObject(input);
+      return Collections.singletonList(obj);
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final List<?> list) {
+      kryo.writeClassAndObject(output, list.get(0));
+    }
+
+  }
+
+  public static void registerSerializers(Config config) {
+    config.registerSerialization(Collections.singletonList("").getClass(),
+            CollectionsSingletonListSerializer.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index ce6ea2c..33393f2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -240,7 +240,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
   }
 
   public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
-    LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+    LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
     if (elem != null) {
       Executor executor = inputTagToExecutor.get(inputTag);
       if (executor != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 3b6b4d5..68a17e5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -261,7 +261,6 @@ class JStormStateInternals<K> implements StateInternals {
     private final StateNamespace namespace;
     private final IKvStore<ComposedKey, T> kvState;
     private final IKvStore<ComposedKey, Object> stateInfoKvState;
-    private int elemIndex;
 
     JStormBagState(
         @Nullable K key,
@@ -272,17 +271,19 @@ class JStormStateInternals<K> implements StateInternals {
       this.namespace = checkNotNull(namespace, "namespace");
       this.kvState = checkNotNull(kvState, "kvState");
       this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+    }
 
-      Integer index = (Integer) stateInfoKvState.get(getComposedKey());
-      this.elemIndex = index != null ? ++index : 0;
+    private int getElementIndex() throws IOException {
+      Integer elementIndex = (Integer) stateInfoKvState.get(getComposedKey());
+      return elementIndex != null ? elementIndex : 0;
     }
 
     @Override
     public void add(T input) {
       try {
+        int elemIndex = getElementIndex();
         kvState.put(getComposedKey(elemIndex), input);
-        stateInfoKvState.put(getComposedKey(), elemIndex);
-        elemIndex++;
+        stateInfoKvState.put(getComposedKey(), ++elemIndex);
       } catch (IOException e) {
         throw new RuntimeException(e.getCause());
       }
@@ -293,7 +294,12 @@ class JStormStateInternals<K> implements StateInternals {
       return new ReadableState<Boolean>() {
         @Override
         public Boolean read() {
-          return elemIndex <= 0;
+          try {
+            return getElementIndex() <= 0;
+          } catch (IOException e) {
+            LOG.error("Failed to read", e);
+            return false;
+          }
         }
 
         @Override
@@ -306,7 +312,7 @@ class JStormStateInternals<K> implements StateInternals {
 
     @Override
     public Iterable<T> read() {
-      return new BagStateIterable(elemIndex);
+      return new BagStateIterable();
     }
 
     @Override
@@ -318,11 +324,11 @@ class JStormStateInternals<K> implements StateInternals {
     @Override
     public void clear() {
       try {
+        int elemIndex = getElementIndex();
         for (int i = 0; i < elemIndex; i++) {
           kvState.remove(getComposedKey(i));
         }
         stateInfoKvState.remove(getComposedKey());
-        elemIndex = 0;
       } catch (IOException e) {
         throw new RuntimeException(e.getCause());
       }
@@ -336,6 +342,18 @@ class JStormStateInternals<K> implements StateInternals {
       return ComposedKey.of(key, namespace, elemIndex);
     }
 
+    @Override
+    public String toString() {
+      int elemIndex = -1;
+      try {
+        elemIndex = getElementIndex();
+      } catch (IOException e) {
+
+      }
+      return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d",
+              key, namespace, elemIndex);
+    }
+
     /**
      * Implementation of Bag state Iterable.
      */
@@ -346,13 +364,11 @@ class JStormStateInternals<K> implements StateInternals {
         private int cursor = 0;
 
         BagStateIterator() {
-          Integer s = null;
           try {
-            s = (Integer) stateInfoKvState.get(getComposedKey());
+            this.size = getElementIndex();
           } catch (IOException e) {
-            LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+            throw new RuntimeException(e.getCause());
           }
-          this.size = s != null ? ++s : 0;
         }
 
         @Override
@@ -382,10 +398,8 @@ class JStormStateInternals<K> implements StateInternals {
         }
       }
 
-      private final int size;
+      BagStateIterable() {
 
-      BagStateIterable(int size) {
-        this.size = size;
       }
 
       @Override