You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:50 UTC

[50/53] [abbrv] beam git commit: jstorm-runner: Add Kryo serializer for UnmodifiableIterable

jstorm-runner: Add Kryo serializer for UnmodifiableIterable


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e808730
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e808730
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e808730

Branch: refs/heads/jstorm-runner
Commit: 9e8087306b5562fdecf678979b9f2d49dfaf368f
Parents: 90ed2ef
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Aug 16 19:01:48 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:01 2017 +0800

----------------------------------------------------------------------
 .../BeamSdkRepackUtilsSerializer.java           | 34 ++++++++++++++++++++
 .../serialization/GuavaUtilsSerializer.java     | 34 ++++++++++++++++++++
 2 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
index 4ae47eb..2912194 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
@@ -24,12 +24,14 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
 import java.util.EnumMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
@@ -244,10 +246,42 @@ public class BeamSdkRepackUtilsSerializer {
         ImmutableSetSerializer.class);
   }
 
+  /**
+   * Specific serializer of {@link Kryo} for UnmodifiableIterable.
+   */
+  public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> {
+
+    @Override
+    public void write(Kryo kryo, Output output, Iterable<Object> object) {
+      int size = Iterables.size(object);
+      output.writeInt(size, true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) {
+      final int size = input.readInt(true);
+      List<Object> iterable = Lists.newArrayList();
+      for (int i = 0; i < size; ++i) {
+        iterable.add(kryo.readClassAndObject(input));
+      }
+      return Iterables.unmodifiableIterable(iterable);
+    }
+  }
+
+  private static void registerUnmodifiableIterablesSerializers(Config config) {
+    config.registerSerialization(
+        Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(),
+        UnmodifiableIterableSerializer.class);
+  }
+
   public static void registerSerializers(Config config) {
     registerImmutableListSerializers(config);
     registerImmutableMapSerializers(config);
     registerImmutableSetSerializers(config);
+    registerUnmodifiableIterablesSerializers(config);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
index e6f750c..ee83aa6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -34,6 +35,7 @@ import com.google.common.collect.Table;
 
 import java.util.EnumMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -244,9 +246,41 @@ public class GuavaUtilsSerializer {
         ImmutableSetSerializer.class);
   }
 
+  /**
+   * Specific serializer of {@link Kryo} for UnmodifiableIterable.
+   */
+  public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> {
+
+    @Override
+    public void write(Kryo kryo, Output output, Iterable<Object> object) {
+      int size = Iterables.size(object);
+      output.writeInt(size, true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) {
+      final int size = input.readInt(true);
+      List<Object> iterable = Lists.newArrayList();
+      for (int i = 0; i < size; ++i) {
+        iterable.add(kryo.readClassAndObject(input));
+      }
+      return Iterables.unmodifiableIterable(iterable);
+    }
+  }
+
+  private static void registerUnmodifiableIterablesSerializers(Config config) {
+    config.registerSerialization(
+        Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(),
+        UnmodifiableIterableSerializer.class);
+  }
+
   public static void registerSerializers(Config config) {
     registerImmutableListSerializers(config);
     registerImmutableMapSerializers(config);
     registerImmutableSetSerializers(config);
+    registerUnmodifiableIterablesSerializers(config);
   }
 }