You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:39 UTC
[39/53] [abbrv] beam git commit: jstorm-runner: add
SdkRepackImmutableMapSerializer.
jstorm-runner: add SdkRepackImmutableMapSerializer.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588a6981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588a6981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588a6981
Branch: refs/heads/jstorm-runner
Commit: 588a6981855b68b9733a1b0f368dce0ad5cfe837
Parents: ad04648
Author: Pei He <pe...@apache.org>
Authored: Wed Jul 19 11:13:04 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/JStormRunner.java | 2 +
.../SdkRepackImmutableMapSerializer.java | 73 ++++++++++++++++++++
2 files changed, 75 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index baf4e5a..286a975 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer;
import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
import org.apache.beam.runners.jstorm.translation.AbstractComponent;
import org.apache.beam.runners.jstorm.translation.CommonInstance;
@@ -103,6 +104,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
ImmutableSetSerializer.registerSerializers(config);
SdkRepackImmuSetSerializer.registerSerializers(config);
ImmutableMapSerializer.registerSerializers(config);
+ SdkRepackImmutableMapSerializer.registerSerializers(config);
config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
return config;
http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
new file mode 100644
index 0000000..546538a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
@@ -0,0 +1,73 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
+
+/**
+ * Specific serializer of {@link Kryo} for ImmutableMap.
+ */
+public class SdkRepackImmutableMapSerializer
+ extends Serializer<ImmutableMap<Object, ? extends Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = true;
+ private static final boolean IMMUTABLE = true;
+
+ public SdkRepackImmutableMapSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+ kryo.writeObject(output, Maps.newHashMap(immutableMap));
+ }
+
+ @Override
+ public ImmutableMap<Object, Object> read(
+ Kryo kryo,
+ Input input,
+ Class<ImmutableMap<Object, ? extends Object>> type) {
+ Map map = kryo.readObject(input, HashMap.class);
+ return ImmutableMap.copyOf(map);
+ }
+
+ /**
+ * Creates a new {@link SdkRepackImmutableMapSerializer} and registers its serializer
+ * for the several ImmutableMap related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ config.registerSerialization(ImmutableMap.class, SdkRepackImmutableMapSerializer.class);
+ config.registerSerialization(
+ ImmutableMap.of().getClass(), SdkRepackImmutableMapSerializer.class);
+
+ Object o1 = new Object();
+ Object o2 = new Object();
+
+ config.registerSerialization(
+ ImmutableMap.of(o1, o1).getClass(), SdkRepackImmutableMapSerializer.class);
+ config.registerSerialization(
+ ImmutableMap.of(o1, o1, o2, o2).getClass(),
+ SdkRepackImmutableMapSerializer.class);
+ Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+ for (DummyEnum e : DummyEnum.values()) {
+ enumMap.put(e, o1);
+ }
+
+ config.registerSerialization(
+ ImmutableMap.copyOf(enumMap).getClass(),
+ SdkRepackImmutableMapSerializer.class);
+ }
+
+ private enum DummyEnum {
+ VALUE1,
+ VALUE2
+ }
+}