You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 20:02:53 UTC

[11/13] storm git commit: STORM-764: Have option to compress thrift heartbeat

STORM-764: Have option to compress thrift heartbeat


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bd0eb0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bd0eb0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bd0eb0

Branch: refs/heads/0.10.x-branch
Commit: c9bd0eb01b97fce7e2b9325ef4639973c9d3cbd4
Parents: 155e52a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 8 14:03:56 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 13:52:21 2015 -0400

----------------------------------------------------------------------
 .../GzipBridgeThriftSerializationDelegate.java  | 65 ++++++++++++++++++
 .../GzipThriftSerializationDelegate.java        | 57 ++++++++++++++++
 .../src/jvm/backtype/storm/utils/Utils.java     | 32 +++++++++
 ...ipBridgeThriftSerializationDelegateTest.java | 71 ++++++++++++++++++++
 4 files changed, 225 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9bd0eb0/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
new file mode 100644
index 0000000..bb3ad7e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.serialization;
+
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses
+ * {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled
+ * via {@link #prepare(java.util.Map)} is passed through to both delegates.
+ */
+@Deprecated
+public class GzipBridgeThriftSerializationDelegate implements SerializationDelegate {
+
+    private ThriftSerializationDelegate defaultDelegate = new ThriftSerializationDelegate();
+    private GzipThriftSerializationDelegate gzipDelegate = new GzipThriftSerializationDelegate();
+
+    @Override
+    public void prepare(Map stormConf) {
+        defaultDelegate.prepare(stormConf);
+        gzipDelegate.prepare(stormConf);
+    }
+
+    @Override
+    public byte[] serialize(Object object) {
+        return gzipDelegate.serialize(object);
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
+        if (isGzipped(bytes)) {
+            return gzipDelegate.deserialize(bytes, clazz);
+        } else {
+            return defaultDelegate.deserialize(bytes,clazz);
+        }
+    }
+
+    // Split up GZIP_MAGIC into readable bytes
+    private static final byte GZIP_MAGIC_FIRST_BYTE = (byte) GZIPInputStream.GZIP_MAGIC;
+    private static final byte GZIP_MAGIC_SECOND_BYTE = (byte) (GZIPInputStream.GZIP_MAGIC >> 8);
+
+    /**
+     * Looks ahead to see if the GZIP magic constant is heading {@code bytes}
+     */
+    private boolean isGzipped(byte[] bytes) {
+        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
+               && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bd0eb0/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
new file mode 100644
index 0000000..933a125
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.serialization;
+
+import java.io.IOException;
+import java.util.Map;
+import backtype.storm.utils.Utils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+/**
+ * Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
+ */
+public class GzipThriftSerializationDelegate implements SerializationDelegate {
+
+    @Override
+    public void prepare(Map stormConf) {
+        // No-op
+    }
+
+    @Override
+    public byte[] serialize(Object object) {
+        try {
+            return Utils.gzip(new TSerializer().serialize((TBase) object));
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
+        try {
+            TBase instance = (TBase) clazz.newInstance();
+            new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
+            return (T)instance;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bd0eb0/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 4123f73..d0046a9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -46,6 +46,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -101,6 +103,36 @@ public class Utils {
         }
     }
 
+    public static byte[] gzip(byte[] data) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            GZIPOutputStream out = new GZIPOutputStream(bos);
+            out.write(data);
+            out.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static byte[] gunzip(byte[] data) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ByteArrayInputStream bis = new ByteArrayInputStream(data);
+            GZIPInputStream in = new GZIPInputStream(bis);
+            byte[] buffer = new byte[1024];
+            int len = 0;
+            while ((len = in.read(buffer)) >= 0) {
+                bos.write(buffer, 0, len);
+            }
+            in.close();
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static <T> String join(Iterable<T> coll, String sep) {
         Iterator<T> it = coll.iterator();
         String ret = "";

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bd0eb0/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegateTest.java b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegateTest.java
new file mode 100644
index 0000000..53bdab6
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeThriftSerializationDelegateTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.serialization;
+
+import static org.junit.Assert.*;
+
+import java.io.Serializable;
+import org.junit.Test;
+import org.junit.Before;
+import backtype.storm.generated.GlobalStreamId;
+
+
+public class GzipBridgeThriftSerializationDelegateTest {
+    SerializationDelegate testDelegate;
+
+    @Before
+    public void setUp() throws Exception {
+        testDelegate = new GzipBridgeThriftSerializationDelegate();
+    }
+
+    @Test
+    public void testDeserialize_readingFromGzip() throws Exception {
+        GlobalStreamId id = new GlobalStreamId("first", "second");
+
+        byte[] serialized = new GzipThriftSerializationDelegate().serialize(id);
+
+        GlobalStreamId id2 = testDelegate.deserialize(serialized, GlobalStreamId.class);
+
+        assertEquals(id2.get_componentId(), id.get_componentId());
+        assertEquals(id2.get_streamId(), id.get_streamId());
+    }
+
+    @Test
+    public void testDeserialize_readingFromGzipBridge() throws Exception {
+        GlobalStreamId id = new GlobalStreamId("first", "second");
+
+        byte[] serialized = new GzipBridgeThriftSerializationDelegate().serialize(id);
+
+        GlobalStreamId id2 = testDelegate.deserialize(serialized, GlobalStreamId.class);
+
+        assertEquals(id2.get_componentId(), id.get_componentId());
+        assertEquals(id2.get_streamId(), id.get_streamId());
+    }
+
+    @Test
+    public void testDeserialize_readingFromDefault() throws Exception {
+        GlobalStreamId id = new GlobalStreamId("A", "B");
+
+        byte[] serialized = new ThriftSerializationDelegate().serialize(id);
+
+        GlobalStreamId id2 = testDelegate.deserialize(serialized, GlobalStreamId.class);
+
+        assertEquals(id2.get_componentId(), id.get_componentId());
+        assertEquals(id2.get_streamId(), id.get_streamId());
+    }
+}