You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:20 UTC

[13/16] storm git commit: Deprecating DefaultSerializer and DefaultSerializerBridge in favor of ThriftSerializer.

Deprecating DefaultSerializer and DefaultSerializerBridge in favor of ThriftSerializer.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/47bcf3eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/47bcf3eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/47bcf3eb

Branch: refs/heads/master
Commit: 47bcf3ebe7067e86846057cbca2466d7349b0d98
Parents: 5a66bb6
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Mar 11 14:06:49 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Mar 11 14:06:49 2015 -0700

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  2 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  4 +-
 .../storm/coordination/BatchBoltExecutor.java   |  2 +-
 .../DefaultSerializationDelegate.java           |  1 +
 .../GzipBridgeSerializationDelegate.java        |  1 +
 .../ThriftSerializationDelegateBridge.java      | 51 --------------------
 .../storm/topology/TopologyBuilder.java         |  2 +-
 .../src/jvm/backtype/storm/utils/Utils.java     | 23 +++++++--
 .../ThriftBridgeSerializationDelegateTest.java  | 21 +-------
 10 files changed, 29 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0255a55..dfdb54b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -47,7 +47,7 @@ storm.auth.simple-white-list.users: []
 storm.auth.simple-acl.users: []
 storm.auth.simple-acl.users.commands: []
 storm.auth.simple-acl.admins: []
-storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegateBridge"
+storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegate"
 
 ### nimbus.* configs are for the master
 nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 0e083ea..7f688ed 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -96,7 +96,7 @@
         (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
           (mk-custom-grouper grouping context component-id stream-id target-tasks))
       :custom-serialized
-        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping) Serializable)]
+        (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)]
           (mk-custom-grouper grouping context component-id stream-id target-tasks))
       :direct
         :direct

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index fdc54cc..9112365 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -107,7 +107,7 @@
 (defn- read-storm-conf [conf storm-id]
   (let [stormroot (master-stormdist-root conf storm-id)]
     (merge conf
-           (Utils/deserialize
+           (Utils/javaDeserialize
             (FileUtils/readFileToByteArray
              (File. (master-stormconf-path stormroot))
              ) java.util.Map))))
@@ -322,7 +322,7 @@
    (FileUtils/cleanDirectory (File. stormroot))
    (setup-jar conf tmp-jar-location stormroot)
    (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
+   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/javaSerialize storm-conf))
    ))
 
 (defn- read-storm-topology [conf storm-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
index 3d3208b..89ef028 100644
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
@@ -41,7 +41,7 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
     BatchOutputCollectorImpl _collector;
     
     public BatchBoltExecutor(IBatchBolt bolt) {
-        _boltSer = Utils.serialize(bolt);
+        _boltSer = Utils.javaSerialize(bolt);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
index 913baa6..6d986af 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
@@ -20,6 +20,7 @@ package backtype.storm.serialization;
 import java.io.*;
 import java.util.Map;
 
+@Deprecated
 public class DefaultSerializationDelegate implements SerializationDelegate {
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
index 1306ea9..c8377c3 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
@@ -25,6 +25,7 @@ import java.util.zip.GZIPInputStream;
  * {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled
  * via {@link #prepare(java.util.Map)} is passed through to both delegates.
  */
+@Deprecated
 public class GzipBridgeSerializationDelegate implements SerializationDelegate {
 
     private DefaultSerializationDelegate defaultDelegate = new DefaultSerializationDelegate();

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java b/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
deleted file mode 100644
index 8b8c95d..0000000
--- a/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.serialization;
-
-import org.apache.thrift.TBase;
-
-import java.util.Map;
-
-public class ThriftSerializationDelegateBridge implements SerializationDelegate {
-    private SerializationDelegate thriftSerializationDelegate = new ThriftSerializationDelegate();
-    private SerializationDelegate defaultSerializationDelegate = new DefaultSerializationDelegate();
-
-    @Override
-    public void prepare(Map stormConf) {
-        this.thriftSerializationDelegate.prepare(stormConf);
-        this.defaultSerializationDelegate.prepare(stormConf);
-    }
-
-    @Override
-    public byte[] serialize(Object object) {
-        if(object instanceof TBase) {
-            return thriftSerializationDelegate.serialize(object);
-        } else {
-            return defaultSerializationDelegate.serialize(object);
-        }
-    }
-
-    @Override
-    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
-        if(TBase.class.isAssignableFrom(clazz)) {
-            return thriftSerializationDelegate.deserialize(bytes, clazz);
-        } else {
-            return defaultSerializationDelegate.deserialize(bytes, clazz);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 9d8f271..806549a 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -348,7 +348,7 @@ public class TopologyBuilder {
 
         @Override
         public BoltDeclarer customGrouping(String componentId, String streamId, CustomStreamGrouping grouping) {
-            return grouping(componentId, streamId, Grouping.custom_serialized(Utils.serialize(grouping)));
+            return grouping(componentId, streamId, Grouping.custom_serialized(Utils.javaSerialize(grouping)));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index ba1a2ab..4123f73 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -52,7 +52,6 @@ public class Utils {
     public static final String DEFAULT_STREAM_ID = "default";
 
     private static SerializationDelegate serializationDelegate;
-    private static final DefaultSerializationDelegate javaSerializationDelegate = new DefaultSerializationDelegate();
 
     static {
         Map conf = readStormConfig();
@@ -77,11 +76,29 @@ public class Utils {
     }
 
     public static byte[] javaSerialize(Object obj) {
-        return javaSerializationDelegate.serialize(obj);
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
+            oos.writeObject(obj);
+            oos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
-        return javaSerializationDelegate.deserialize(serialized, clazz);
+        try {
+            ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+            ObjectInputStream ois = new ObjectInputStream(bis);
+            Object ret = ois.readObject();
+            ois.close();
+            return (T)ret;
+        } catch(IOException ioe) {
+            throw new RuntimeException(ioe);
+        } catch(ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public static <T> String join(Iterable<T> coll, String sep) {

http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
index ef17017..b408a80 100644
--- a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
+++ b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
@@ -32,30 +32,11 @@ public class ThriftBridgeSerializationDelegateTest {
 
     @Before
     public void setUp() throws Exception {
-        testDelegate = new ThriftSerializationDelegateBridge();
+        testDelegate = new ThriftSerializationDelegate();
         testDelegate.prepare(null);
     }
 
     @Test
-    public void testNonThriftInstance() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "foo";
-        pojo.age = 100;
-
-        byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-
-        serialized = testDelegate.serialize(pojo);
-        pojo2 = (TestPojo) new DefaultSerializationDelegate().deserialize(serialized, Serializable.class);
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    @Test
     public void testThriftInstance() throws Exception {
         ErrorInfo errorInfo = new ErrorInfo();
         errorInfo.set_error("error");