You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:39 UTC

[39/53] [abbrv] beam git commit: jstorm-runner: add SdkRepackImmutableMapSerializer.

jstorm-runner: add SdkRepackImmutableMapSerializer.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588a6981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588a6981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588a6981

Branch: refs/heads/jstorm-runner
Commit: 588a6981855b68b9733a1b0f368dce0ad5cfe837
Parents: ad04648
Author: Pei He <pe...@apache.org>
Authored: Wed Jul 19 11:13:04 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunner.java       |  2 +
 .../SdkRepackImmutableMapSerializer.java        | 73 ++++++++++++++++++++
 2 files changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index baf4e5a..286a975 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
 import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
 import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
 import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer;
 import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
 import org.apache.beam.runners.jstorm.translation.AbstractComponent;
 import org.apache.beam.runners.jstorm.translation.CommonInstance;
@@ -103,6 +104,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
     ImmutableSetSerializer.registerSerializers(config);
     SdkRepackImmuSetSerializer.registerSerializers(config);
     ImmutableMapSerializer.registerSerializers(config);
+    SdkRepackImmutableMapSerializer.registerSerializers(config);
 
     config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
     return config;

http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
new file mode 100644
index 0000000..546538a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
@@ -0,0 +1,73 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
+
+/**
+ * Specific serializer of {@link Kryo} for ImmutableMap.
+ */
+public class SdkRepackImmutableMapSerializer
+    extends Serializer<ImmutableMap<Object, ? extends Object>> {
+
+  private static final boolean DOES_NOT_ACCEPT_NULL = true;
+  private static final boolean IMMUTABLE = true;
+
+  public SdkRepackImmutableMapSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+    kryo.writeObject(output, Maps.newHashMap(immutableMap));
+  }
+
+  @Override
+  public ImmutableMap<Object, Object> read(
+      Kryo kryo,
+      Input input,
+      Class<ImmutableMap<Object, ? extends Object>> type) {
+    Map map = kryo.readObject(input, HashMap.class);
+    return ImmutableMap.copyOf(map);
+  }
+
+  /**
+   * Creates a new {@link SdkRepackImmutableMapSerializer} and registers its serializer
+   * for the several ImmutableMap related classes.
+   */
+  public static void registerSerializers(Config config) {
+
+    config.registerSerialization(ImmutableMap.class, SdkRepackImmutableMapSerializer.class);
+    config.registerSerialization(
+        ImmutableMap.of().getClass(), SdkRepackImmutableMapSerializer.class);
+
+    Object o1 = new Object();
+    Object o2 = new Object();
+
+    config.registerSerialization(
+        ImmutableMap.of(o1, o1).getClass(), SdkRepackImmutableMapSerializer.class);
+    config.registerSerialization(
+        ImmutableMap.of(o1, o1, o2, o2).getClass(),
+        SdkRepackImmutableMapSerializer.class);
+    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+    for (DummyEnum e : DummyEnum.values()) {
+      enumMap.put(e, o1);
+    }
+
+    config.registerSerialization(
+        ImmutableMap.copyOf(enumMap).getClass(),
+        SdkRepackImmutableMapSerializer.class);
+  }
+
+  private enum DummyEnum {
+    VALUE1,
+    VALUE2
+  }
+}