You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/25 20:57:49 UTC

[1/3] storm git commit: STORM-1220. Avoid double copying in the Kafka spout.

Repository: storm
Updated Branches:
  refs/heads/master 20a864d08 -> 01bab8654


STORM-1220. Avoid double copying in the Kafka spout.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/35f1da78
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/35f1da78
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/35f1da78

Branch: refs/heads/master
Commit: 35f1da7890b597c9634904c0589f6ac64584e539
Parents: a8d253a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Nov 19 13:32:20 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Nov 24 13:23:16 2015 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  8 ++--
 .../src/jvm/storm/kafka/KeyValueScheme.java     |  5 +--
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  5 ++-
 .../jvm/storm/kafka/MessageMetadataScheme.java  |  6 ++-
 .../MessageMetadataSchemeAsMultiScheme.java     |  3 +-
 .../jvm/storm/kafka/StringKeyValueScheme.java   |  3 +-
 .../kafka/StringMessageAndMetadataScheme.java   |  7 ++--
 .../storm/kafka/StringMultiSchemeWithTopic.java | 21 +++-------
 .../src/jvm/storm/kafka/StringScheme.java       | 20 ++++++----
 .../storm/kafka/StringKeyValueSchemeTest.java   | 17 ++++++---
 .../src/test/storm/kafka/TestStringScheme.java  | 40 ++++++++++++++++++++
 .../jvm/backtype/storm/spout/MultiScheme.java   |  3 +-
 .../backtype/storm/spout/RawMultiScheme.java    |  3 +-
 .../src/jvm/backtype/storm/spout/RawScheme.java |  9 ++++-
 .../src/jvm/backtype/storm/spout/Scheme.java    |  3 +-
 .../storm/spout/SchemeAsMultiScheme.java        |  3 +-
 16 files changed, 106 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index cd684df..52cdde8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -221,12 +221,12 @@ public class KafkaUtils {
         }
         ByteBuffer key = msg.key();
         if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
-            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
+            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
         } else {
             if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
-                tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, Utils.toByteArray(payload));
+                tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload);
             } else {
-                tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+                tups = kafkaConfig.scheme.deserialize(payload);
             }
         }
         return tups;
@@ -237,7 +237,7 @@ public class KafkaUtils {
         if (payload == null) {
             return null;
         }
-        return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+        return scheme.deserializeMessageWithMetadata(payload, partition, offset);
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
index f42f7c8..7c0dc6c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
@@ -19,10 +19,9 @@ package storm.kafka;
 
 import backtype.storm.spout.Scheme;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public interface KeyValueScheme extends Scheme {
-
-    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
-
+    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index 7def6ac..d27ae7e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -19,16 +19,17 @@ package storm.kafka;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
 
     public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
         super(scheme);
     }
 
-    public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
+    public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
         List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
         if(o == null) return null;
         else return Arrays.asList(o);

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
index 92a5598..62f652f 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
@@ -17,9 +17,11 @@
  */
 package storm.kafka;
 
-import java.util.List;
 import backtype.storm.spout.Scheme;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 public interface MessageMetadataScheme extends Scheme {
-    public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset);
+    List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index 0567809..f23a101 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -17,6 +17,7 @@
  */
 package storm.kafka;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -29,7 +30,7 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
         super(scheme);
     }
 
-    public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
+    public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
         List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
         if (o == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
index 41cacb6..6f6d339 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
@@ -20,12 +20,13 @@ package storm.kafka;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.ImmutableMap;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
 
     @Override
-    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
         if ( key == null ) {
             return deserialize(value);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
index 031d497..1708b97 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
@@ -17,11 +17,12 @@
  */
 package storm.kafka;
 
-import java.util.List;
-
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme {
     private static final long serialVersionUID = -5441841920447947374L;
 
@@ -29,7 +30,7 @@ public class StringMessageAndMetadataScheme extends StringScheme implements Mess
     public static final String STRING_SCHEME_OFFSET = "offset";
 
     @Override
-    public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
+    public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
         String stringMessage = StringScheme.deserializeString(message);
         return new Values(stringMessage, partition.partition, offset);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
index e0da2ce..1e7f216 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
@@ -18,13 +18,12 @@
 package storm.kafka;
 
 import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 public class StringMultiSchemeWithTopic
@@ -34,24 +33,16 @@ public class StringMultiSchemeWithTopic
     public static final String TOPIC_KEY = "topic";
 
     @Override
-    public Iterable<List<Object>> deserialize(byte[] bytes) {
+    public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
         throw new NotImplementedException();
     }
 
-    public Iterable<List<Object>> deserializeWithTopic(String topic, byte[] bytes) {
-        List<Object> items = new Values(deserializeString(bytes), topic);
-        return Arrays.asList(items);
+    public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) {
+        List<Object> items = new Values(StringScheme.deserializeString(bytes), topic);
+        return Collections.singletonList(items);
     }
 
     public Fields getOutputFields() {
         return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
     }
-
-    public static String deserializeString(byte[] string) {
-        try {
-            return new String(string, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 286dc9b..1071e60 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@ -20,23 +20,27 @@ package storm.kafka;
 import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 
-import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 public class StringScheme implements Scheme {
-
+    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
     public static final String STRING_SCHEME_KEY = "str";
 
-    public List<Object> deserialize(byte[] bytes) {
+    public List<Object> deserialize(ByteBuffer bytes) {
         return new Values(deserializeString(bytes));
     }
 
-    public static String deserializeString(byte[] string) {
-        try {
-            return new String(string, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
+    public static String deserializeString(ByteBuffer string) {
+        if (string.hasArray()) {
+            int base = string.arrayOffset();
+            return new String(string.array(), base + string.position(), string.remaining());
+        } else {
+            return new String(Utils.toByteArray(string), UTF8_CHARSET);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
index 0b786ba..eddb900 100644
--- a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
@@ -21,7 +21,9 @@ import backtype.storm.tuple.Fields;
 import com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
-import java.util.Arrays;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -32,7 +34,7 @@ public class StringKeyValueSchemeTest {
 
     @Test
     public void testDeserialize() throws Exception {
-        assertEquals(Arrays.asList("test"), scheme.deserialize("test".getBytes()));
+        assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test")));
     }
 
     @Test
@@ -44,12 +46,17 @@ public class StringKeyValueSchemeTest {
 
     @Test
     public void testDeserializeWithNullKeyAndValue() throws Exception {
-        assertEquals(Arrays.asList("test"), scheme.deserializeKeyAndValue(null, "test".getBytes()));
+        assertEquals(Collections.singletonList("test"),
+            scheme.deserializeKeyAndValue(null, wrapString("test")));
     }
 
     @Test
     public void testDeserializeWithKeyAndValue() throws Exception {
-        assertEquals(Arrays.asList(ImmutableMap.of("key", "test")),
-                scheme.deserializeKeyAndValue("key".getBytes(), "test".getBytes()));
+        assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
+                scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test")));
+    }
+
+    private static ByteBuffer wrapString(String s) {
+        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
new file mode 100644
index 0000000..ae36409
--- /dev/null
+++ b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStringScheme {
+  @Test
+  public void testDeserializeString() {
+    String s = "foo";
+    byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+    ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
+    direct.put(bytes);
+    direct.flip();
+    String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
+    String s2 = StringScheme.deserializeString(direct);
+    assertEquals(s, s1);
+    assertEquals(s, s2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
index ca2ce91..57bf4ce 100644
--- a/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
@@ -17,12 +17,13 @@
  */
 package backtype.storm.spout;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.io.Serializable;
 
 import backtype.storm.tuple.Fields;
 
 public interface MultiScheme extends Serializable {
-  public Iterable<List<Object>> deserialize(byte[] ser);
+  public Iterable<List<Object>> deserialize(ByteBuffer ser);
   public Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
index 7f73975..824d16c 100644
--- a/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.spout;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import backtype.storm.tuple.Fields;
@@ -27,7 +28,7 @@ import static java.util.Arrays.asList;
 
 public class RawMultiScheme implements MultiScheme {
   @Override
-  public Iterable<List<Object>> deserialize(byte[] ser) {
+  public Iterable<List<Object>> deserialize(ByteBuffer ser) {
     return asList(tuple(ser));
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/RawScheme.java b/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
index 7e26770..937acb7 100644
--- a/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
@@ -18,12 +18,17 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-    public List<Object> deserialize(byte[] ser) {
-        return tuple(ser);
+    public List<Object> deserialize(ByteBuffer ser) {
+        // Maintain backward compatibility for 0.10
+        byte[] b = Utils.toByteArray(ser);
+        return tuple(new Object[]{b});
     }
 
     public Fields getOutputFields() {

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/Scheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/Scheme.java b/storm-core/src/jvm/backtype/storm/spout/Scheme.java
index ca68954..d696a9c 100644
--- a/storm-core/src/jvm/backtype/storm/spout/Scheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/Scheme.java
@@ -19,10 +19,11 @@ package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 
 public interface Scheme extends Serializable {
-    public List<Object> deserialize(byte[] ser);
+    List<Object> deserialize(ByteBuffer ser);
     public Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
index 29f7fce..a49d55f 100644
--- a/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.spout;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -29,7 +30,7 @@ public class SchemeAsMultiScheme implements MultiScheme {
     this.scheme = scheme;
   }
 
-  @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
+  @Override public Iterable<List<Object>> deserialize(final ByteBuffer ser) {
     List<Object> o = scheme.deserialize(ser);
     if(o == null) return null;
     else return Arrays.asList(o);


[3/3] storm git commit: Added STORM-1220 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1220 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01bab865
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01bab865
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01bab865

Branch: refs/heads/master
Commit: 01bab865400df5dfce3909f8b9f1e6199792d5a3
Parents: 352a284
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Nov 25 11:33:49 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Nov 25 11:33:49 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/01bab865/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b116116..8106078 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 +## 0.11.0
+ * STORM-1220: Avoid double copying in the Kafka spout.
  * STORM-1340: Use Travis-CI build matrix to improve test execution times
  * STORM-1126: Allow a configMethod that takes no arguments (Flux)
  * STORM-1203: worker metadata file creation doesn't use storm.log.dir config


[2/3] storm git commit: Merge branch 'STORM-1220' of https://github.com/haohui/storm into STORM-1220

Posted by sr...@apache.org.
Merge branch 'STORM-1220' of https://github.com/haohui/storm into STORM-1220


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/352a2849
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/352a2849
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/352a2849

Branch: refs/heads/master
Commit: 352a28498648747b25ea07fad313d63e627c3f44
Parents: 20a864d 35f1da7
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Nov 25 11:22:34 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Nov 25 11:22:34 2015 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  8 ++--
 .../src/jvm/storm/kafka/KeyValueScheme.java     |  5 +--
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  5 ++-
 .../jvm/storm/kafka/MessageMetadataScheme.java  |  6 ++-
 .../MessageMetadataSchemeAsMultiScheme.java     |  3 +-
 .../jvm/storm/kafka/StringKeyValueScheme.java   |  3 +-
 .../kafka/StringMessageAndMetadataScheme.java   |  7 ++--
 .../storm/kafka/StringMultiSchemeWithTopic.java | 21 +++-------
 .../src/jvm/storm/kafka/StringScheme.java       | 20 ++++++----
 .../storm/kafka/StringKeyValueSchemeTest.java   | 17 ++++++---
 .../src/test/storm/kafka/TestStringScheme.java  | 40 ++++++++++++++++++++
 .../jvm/backtype/storm/spout/MultiScheme.java   |  3 +-
 .../backtype/storm/spout/RawMultiScheme.java    |  3 +-
 .../src/jvm/backtype/storm/spout/RawScheme.java |  9 ++++-
 .../src/jvm/backtype/storm/spout/Scheme.java    |  3 +-
 .../storm/spout/SchemeAsMultiScheme.java        |  3 +-
 16 files changed, 106 insertions(+), 50 deletions(-)
----------------------------------------------------------------------