You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:20 UTC
[13/16] storm git commit: Deprecating DefaultSerializer and
DefaultSerializerBridge in favor of ThriftSerializer.
Deprecating DefaultSerializer and DefaultSerializerBridge in favor of ThriftSerializer.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/47bcf3eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/47bcf3eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/47bcf3eb
Branch: refs/heads/master
Commit: 47bcf3ebe7067e86846057cbca2466d7349b0d98
Parents: 5a66bb6
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Mar 11 14:06:49 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Mar 11 14:06:49 2015 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../src/clj/backtype/storm/daemon/executor.clj | 2 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 4 +-
.../storm/coordination/BatchBoltExecutor.java | 2 +-
.../DefaultSerializationDelegate.java | 1 +
.../GzipBridgeSerializationDelegate.java | 1 +
.../ThriftSerializationDelegateBridge.java | 51 --------------------
.../storm/topology/TopologyBuilder.java | 2 +-
.../src/jvm/backtype/storm/utils/Utils.java | 23 +++++++--
.../ThriftBridgeSerializationDelegateTest.java | 21 +-------
10 files changed, 29 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0255a55..dfdb54b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -47,7 +47,7 @@ storm.auth.simple-white-list.users: []
storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
storm.auth.simple-acl.admins: []
-storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegateBridge"
+storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegate"
### nimbus.* configs are for the master
nimbus.host: "localhost"
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 0e083ea..7f688ed 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -96,7 +96,7 @@
(let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
(mk-custom-grouper grouping context component-id stream-id target-tasks))
:custom-serialized
- (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping) Serializable)]
+ (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)]
(mk-custom-grouper grouping context component-id stream-id target-tasks))
:direct
:direct
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index fdc54cc..9112365 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -107,7 +107,7 @@
(defn- read-storm-conf [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(merge conf
- (Utils/deserialize
+ (Utils/javaDeserialize
(FileUtils/readFileToByteArray
(File. (master-stormconf-path stormroot))
) java.util.Map))))
@@ -322,7 +322,7 @@
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
- (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
+ (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/javaSerialize storm-conf))
))
(defn- read-storm-topology [conf storm-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
index 3d3208b..89ef028 100644
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
@@ -41,7 +41,7 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
BatchOutputCollectorImpl _collector;
public BatchBoltExecutor(IBatchBolt bolt) {
- _boltSer = Utils.serialize(bolt);
+ _boltSer = Utils.javaSerialize(bolt);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
index 913baa6..6d986af 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/DefaultSerializationDelegate.java
@@ -20,6 +20,7 @@ package backtype.storm.serialization;
import java.io.*;
import java.util.Map;
+@Deprecated
public class DefaultSerializationDelegate implements SerializationDelegate {
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
index 1306ea9..c8377c3 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
@@ -25,6 +25,7 @@ import java.util.zip.GZIPInputStream;
* {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled
* via {@link #prepare(java.util.Map)} is passed through to both delegates.
*/
+@Deprecated
public class GzipBridgeSerializationDelegate implements SerializationDelegate {
private DefaultSerializationDelegate defaultDelegate = new DefaultSerializationDelegate();
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java b/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
deleted file mode 100644
index 8b8c95d..0000000
--- a/storm-core/src/jvm/backtype/storm/serialization/ThriftSerializationDelegateBridge.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.serialization;
-
-import org.apache.thrift.TBase;
-
-import java.util.Map;
-
-public class ThriftSerializationDelegateBridge implements SerializationDelegate {
- private SerializationDelegate thriftSerializationDelegate = new ThriftSerializationDelegate();
- private SerializationDelegate defaultSerializationDelegate = new DefaultSerializationDelegate();
-
- @Override
- public void prepare(Map stormConf) {
- this.thriftSerializationDelegate.prepare(stormConf);
- this.defaultSerializationDelegate.prepare(stormConf);
- }
-
- @Override
- public byte[] serialize(Object object) {
- if(object instanceof TBase) {
- return thriftSerializationDelegate.serialize(object);
- } else {
- return defaultSerializationDelegate.serialize(object);
- }
- }
-
- @Override
- public <T> T deserialize(byte[] bytes, Class<T> clazz) {
- if(TBase.class.isAssignableFrom(clazz)) {
- return thriftSerializationDelegate.deserialize(bytes, clazz);
- } else {
- return defaultSerializationDelegate.deserialize(bytes, clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 9d8f271..806549a 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -348,7 +348,7 @@ public class TopologyBuilder {
@Override
public BoltDeclarer customGrouping(String componentId, String streamId, CustomStreamGrouping grouping) {
- return grouping(componentId, streamId, Grouping.custom_serialized(Utils.serialize(grouping)));
+ return grouping(componentId, streamId, Grouping.custom_serialized(Utils.javaSerialize(grouping)));
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index ba1a2ab..4123f73 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -52,7 +52,6 @@ public class Utils {
public static final String DEFAULT_STREAM_ID = "default";
private static SerializationDelegate serializationDelegate;
- private static final DefaultSerializationDelegate javaSerializationDelegate = new DefaultSerializationDelegate();
static {
Map conf = readStormConfig();
@@ -77,11 +76,29 @@ public class Utils {
}
public static byte[] javaSerialize(Object obj) {
- return javaSerializationDelegate.serialize(obj);
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.close();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
- return javaSerializationDelegate.deserialize(serialized, clazz);
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ Object ret = ois.readObject();
+ ois.close();
+ return (T)ret;
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ } catch(ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
public static <T> String join(Iterable<T> coll, String sep) {
http://git-wip-us.apache.org/repos/asf/storm/blob/47bcf3eb/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
index ef17017..b408a80 100644
--- a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
+++ b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
@@ -32,30 +32,11 @@ public class ThriftBridgeSerializationDelegateTest {
@Before
public void setUp() throws Exception {
- testDelegate = new ThriftSerializationDelegateBridge();
+ testDelegate = new ThriftSerializationDelegate();
testDelegate.prepare(null);
}
@Test
- public void testNonThriftInstance() throws Exception {
- TestPojo pojo = new TestPojo();
- pojo.name = "foo";
- pojo.age = 100;
-
- byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
-
- TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
-
- assertEquals(pojo2.name, pojo.name);
- assertEquals(pojo2.age, pojo.age);
-
- serialized = testDelegate.serialize(pojo);
- pojo2 = (TestPojo) new DefaultSerializationDelegate().deserialize(serialized, Serializable.class);
- assertEquals(pojo2.name, pojo.name);
- assertEquals(pojo2.age, pojo.age);
- }
-
- @Test
public void testThriftInstance() throws Exception {
ErrorInfo errorInfo = new ErrorInfo();
errorInfo.set_error("error");