You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:50 UTC
[50/53] [abbrv] beam git commit: jstorm-runner: Add Kryo serializer
for UnmodifiableIterable
jstorm-runner: Add Kryo serializer for UnmodifiableIterable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e808730
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e808730
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e808730
Branch: refs/heads/jstorm-runner
Commit: 9e8087306b5562fdecf678979b9f2d49dfaf368f
Parents: 90ed2ef
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Aug 16 19:01:48 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:01 2017 +0800
----------------------------------------------------------------------
.../BeamSdkRepackUtilsSerializer.java | 34 ++++++++++++++++++++
.../serialization/GuavaUtilsSerializer.java | 34 ++++++++++++++++++++
2 files changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
index 4ae47eb..2912194 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
@@ -24,12 +24,14 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
import java.util.EnumMap;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
@@ -244,10 +246,42 @@ public class BeamSdkRepackUtilsSerializer {
ImmutableSetSerializer.class);
}
+ /**
+ * Specific serializer of {@link Kryo} for UnmodifiableIterable.
+ */
+ public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Iterable<Object> object) {
+ int size = Iterables.size(object);
+ output.writeInt(size, true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) {
+ final int size = input.readInt(true);
+ List<Object> iterable = Lists.newArrayList();
+ for (int i = 0; i < size; ++i) {
+ iterable.add(kryo.readClassAndObject(input));
+ }
+ return Iterables.unmodifiableIterable(iterable);
+ }
+ }
+
+ private static void registerUnmodifiableIterablesSerializers(Config config) {
+ config.registerSerialization(
+ Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(),
+ UnmodifiableIterableSerializer.class);
+ }
+
public static void registerSerializers(Config config) {
registerImmutableListSerializers(config);
registerImmutableMapSerializers(config);
registerImmutableSetSerializers(config);
+ registerUnmodifiableIterablesSerializers(config);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
index e6f750c..ee83aa6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -34,6 +35,7 @@ import com.google.common.collect.Table;
import java.util.EnumMap;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -244,9 +246,41 @@ public class GuavaUtilsSerializer {
ImmutableSetSerializer.class);
}
+ /**
+ * Specific serializer of {@link Kryo} for UnmodifiableIterable.
+ */
+ public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Iterable<Object> object) {
+ int size = Iterables.size(object);
+ output.writeInt(size, true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) {
+ final int size = input.readInt(true);
+ List<Object> iterable = Lists.newArrayList();
+ for (int i = 0; i < size; ++i) {
+ iterable.add(kryo.readClassAndObject(input));
+ }
+ return Iterables.unmodifiableIterable(iterable);
+ }
+ }
+
+ private static void registerUnmodifiableIterablesSerializers(Config config) {
+ config.registerSerialization(
+ Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(),
+ UnmodifiableIterableSerializer.class);
+ }
+
public static void registerSerializers(Config config) {
registerImmutableListSerializers(config);
registerImmutableMapSerializers(config);
registerImmutableSetSerializers(config);
+ registerUnmodifiableIterablesSerializers(config);
}
}