You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/25 20:57:49 UTC
[1/3] storm git commit: STORM-1220. Avoid double copying in the Kafka
spout.
Repository: storm
Updated Branches:
refs/heads/master 20a864d08 -> 01bab8654
STORM-1220. Avoid double copying in the Kafka spout.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/35f1da78
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/35f1da78
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/35f1da78
Branch: refs/heads/master
Commit: 35f1da7890b597c9634904c0589f6ac64584e539
Parents: a8d253a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Nov 19 13:32:20 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Nov 24 13:23:16 2015 -0800
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaUtils.java | 8 ++--
.../src/jvm/storm/kafka/KeyValueScheme.java | 5 +--
.../kafka/KeyValueSchemeAsMultiScheme.java | 5 ++-
.../jvm/storm/kafka/MessageMetadataScheme.java | 6 ++-
.../MessageMetadataSchemeAsMultiScheme.java | 3 +-
.../jvm/storm/kafka/StringKeyValueScheme.java | 3 +-
.../kafka/StringMessageAndMetadataScheme.java | 7 ++--
.../storm/kafka/StringMultiSchemeWithTopic.java | 21 +++-------
.../src/jvm/storm/kafka/StringScheme.java | 20 ++++++----
.../storm/kafka/StringKeyValueSchemeTest.java | 17 ++++++---
.../src/test/storm/kafka/TestStringScheme.java | 40 ++++++++++++++++++++
.../jvm/backtype/storm/spout/MultiScheme.java | 3 +-
.../backtype/storm/spout/RawMultiScheme.java | 3 +-
.../src/jvm/backtype/storm/spout/RawScheme.java | 9 ++++-
.../src/jvm/backtype/storm/spout/Scheme.java | 3 +-
.../storm/spout/SchemeAsMultiScheme.java | 3 +-
16 files changed, 106 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index cd684df..52cdde8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -221,12 +221,12 @@ public class KafkaUtils {
}
ByteBuffer key = msg.key();
if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
- tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
+ tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
} else {
if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
- tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, Utils.toByteArray(payload));
+ tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload);
} else {
- tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+ tups = kafkaConfig.scheme.deserialize(payload);
}
}
return tups;
@@ -237,7 +237,7 @@ public class KafkaUtils {
if (payload == null) {
return null;
}
- return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+ return scheme.deserializeMessageWithMetadata(payload, partition, offset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
index f42f7c8..7c0dc6c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
@@ -19,10 +19,9 @@ package storm.kafka;
import backtype.storm.spout.Scheme;
+import java.nio.ByteBuffer;
import java.util.List;
public interface KeyValueScheme extends Scheme {
-
- public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
-
+ List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index 7def6ac..d27ae7e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -19,16 +19,17 @@ package storm.kafka;
import backtype.storm.spout.SchemeAsMultiScheme;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
super(scheme);
}
- public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
+ public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
if(o == null) return null;
else return Arrays.asList(o);
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
index 92a5598..62f652f 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
@@ -17,9 +17,11 @@
*/
package storm.kafka;
-import java.util.List;
import backtype.storm.spout.Scheme;
+import java.nio.ByteBuffer;
+import java.util.List;
+
public interface MessageMetadataScheme extends Scheme {
- public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset);
+ List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index 0567809..f23a101 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -17,6 +17,7 @@
*/
package storm.kafka;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -29,7 +30,7 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
super(scheme);
}
- public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
+ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
if (o == null) {
return null;
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
index 41cacb6..6f6d339 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
@@ -20,12 +20,13 @@ package storm.kafka;
import backtype.storm.tuple.Values;
import com.google.common.collect.ImmutableMap;
+import java.nio.ByteBuffer;
import java.util.List;
public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
@Override
- public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+ public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
if ( key == null ) {
return deserialize(value);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
index 031d497..1708b97 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
@@ -17,11 +17,12 @@
*/
package storm.kafka;
-import java.util.List;
-
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
+import java.nio.ByteBuffer;
+import java.util.List;
+
public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme {
private static final long serialVersionUID = -5441841920447947374L;
@@ -29,7 +30,7 @@ public class StringMessageAndMetadataScheme extends StringScheme implements Mess
public static final String STRING_SCHEME_OFFSET = "offset";
@Override
- public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
+ public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
String stringMessage = StringScheme.deserializeString(message);
return new Values(stringMessage, partition.partition, offset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
index e0da2ce..1e7f216 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
@@ -18,13 +18,12 @@
package storm.kafka;
import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
public class StringMultiSchemeWithTopic
@@ -34,24 +33,16 @@ public class StringMultiSchemeWithTopic
public static final String TOPIC_KEY = "topic";
@Override
- public Iterable<List<Object>> deserialize(byte[] bytes) {
+ public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
throw new NotImplementedException();
}
- public Iterable<List<Object>> deserializeWithTopic(String topic, byte[] bytes) {
- List<Object> items = new Values(deserializeString(bytes), topic);
- return Arrays.asList(items);
+ public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) {
+ List<Object> items = new Values(StringScheme.deserializeString(bytes), topic);
+ return Collections.singletonList(items);
}
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
}
-
- public static String deserializeString(byte[] string) {
- try {
- return new String(string, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 286dc9b..1071e60 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@ -20,23 +20,27 @@ package storm.kafka;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
-import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.List;
public class StringScheme implements Scheme {
-
+ private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
public static final String STRING_SCHEME_KEY = "str";
- public List<Object> deserialize(byte[] bytes) {
+ public List<Object> deserialize(ByteBuffer bytes) {
return new Values(deserializeString(bytes));
}
- public static String deserializeString(byte[] string) {
- try {
- return new String(string, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
+ public static String deserializeString(ByteBuffer string) {
+ if (string.hasArray()) {
+ int base = string.arrayOffset();
+ return new String(string.array(), base + string.position(), string.remaining());
+ } else {
+ return new String(Utils.toByteArray(string), UTF8_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
index 0b786ba..eddb900 100644
--- a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
@@ -21,7 +21,9 @@ import backtype.storm.tuple.Fields;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -32,7 +34,7 @@ public class StringKeyValueSchemeTest {
@Test
public void testDeserialize() throws Exception {
- assertEquals(Arrays.asList("test"), scheme.deserialize("test".getBytes()));
+ assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test")));
}
@Test
@@ -44,12 +46,17 @@ public class StringKeyValueSchemeTest {
@Test
public void testDeserializeWithNullKeyAndValue() throws Exception {
- assertEquals(Arrays.asList("test"), scheme.deserializeKeyAndValue(null, "test".getBytes()));
+ assertEquals(Collections.singletonList("test"),
+ scheme.deserializeKeyAndValue(null, wrapString("test")));
}
@Test
public void testDeserializeWithKeyAndValue() throws Exception {
- assertEquals(Arrays.asList(ImmutableMap.of("key", "test")),
- scheme.deserializeKeyAndValue("key".getBytes(), "test".getBytes()));
+ assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
+ scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test")));
+ }
+
+ private static ByteBuffer wrapString(String s) {
+ return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
new file mode 100644
index 0000000..ae36409
--- /dev/null
+++ b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStringScheme {
+ @Test
+ public void testDeserializeString() {
+ String s = "foo";
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+ ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
+ direct.put(bytes);
+ direct.flip();
+ String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
+ String s2 = StringScheme.deserializeString(direct);
+ assertEquals(s, s1);
+ assertEquals(s, s2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
index ca2ce91..57bf4ce 100644
--- a/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/MultiScheme.java
@@ -17,12 +17,13 @@
*/
package backtype.storm.spout;
+import java.nio.ByteBuffer;
import java.util.List;
import java.io.Serializable;
import backtype.storm.tuple.Fields;
public interface MultiScheme extends Serializable {
- public Iterable<List<Object>> deserialize(byte[] ser);
+ public Iterable<List<Object>> deserialize(ByteBuffer ser);
public Fields getOutputFields();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
index 7f73975..824d16c 100644
--- a/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/RawMultiScheme.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.spout;
+import java.nio.ByteBuffer;
import java.util.List;
import backtype.storm.tuple.Fields;
@@ -27,7 +28,7 @@ import static java.util.Arrays.asList;
public class RawMultiScheme implements MultiScheme {
@Override
- public Iterable<List<Object>> deserialize(byte[] ser) {
+ public Iterable<List<Object>> deserialize(ByteBuffer ser) {
return asList(tuple(ser));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/RawScheme.java b/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
index 7e26770..937acb7 100644
--- a/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/RawScheme.java
@@ -18,12 +18,17 @@
package backtype.storm.spout;
import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
import java.util.List;
import static backtype.storm.utils.Utils.tuple;
public class RawScheme implements Scheme {
- public List<Object> deserialize(byte[] ser) {
- return tuple(ser);
+ public List<Object> deserialize(ByteBuffer ser) {
+ // Maintain backward compatibility for 0.10
+ byte[] b = Utils.toByteArray(ser);
+ return tuple(new Object[]{b});
}
public Fields getOutputFields() {
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/Scheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/Scheme.java b/storm-core/src/jvm/backtype/storm/spout/Scheme.java
index ca68954..d696a9c 100644
--- a/storm-core/src/jvm/backtype/storm/spout/Scheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/Scheme.java
@@ -19,10 +19,11 @@ package backtype.storm.spout;
import backtype.storm.tuple.Fields;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.List;
public interface Scheme extends Serializable {
- public List<Object> deserialize(byte[] ser);
+ List<Object> deserialize(ByteBuffer ser);
public Fields getOutputFields();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35f1da78/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java b/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
index 29f7fce..a49d55f 100644
--- a/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
+++ b/storm-core/src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.spout;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -29,7 +30,7 @@ public class SchemeAsMultiScheme implements MultiScheme {
this.scheme = scheme;
}
- @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
+ @Override public Iterable<List<Object>> deserialize(final ByteBuffer ser) {
List<Object> o = scheme.deserialize(ser);
if(o == null) return null;
else return Arrays.asList(o);
[3/3] storm git commit: Added STORM-1220 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1220 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01bab865
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01bab865
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01bab865
Branch: refs/heads/master
Commit: 01bab865400df5dfce3909f8b9f1e6199792d5a3
Parents: 352a284
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Nov 25 11:33:49 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Nov 25 11:33:49 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/01bab865/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b116116..8106078 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
+## 0.11.0
+ * STORM-1220: Avoid double copying in the Kafka spout.
* STORM-1340: Use Travis-CI build matrix to improve test execution times
* STORM-1126: Allow a configMethod that takes no arguments (Flux)
* STORM-1203: worker metadata file creation doesn't use storm.log.dir config
[2/3] storm git commit: Merge branch 'STORM-1220' of
https://github.com/haohui/storm into STORM-1220
Posted by sr...@apache.org.
Merge branch 'STORM-1220' of https://github.com/haohui/storm into STORM-1220
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/352a2849
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/352a2849
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/352a2849
Branch: refs/heads/master
Commit: 352a28498648747b25ea07fad313d63e627c3f44
Parents: 20a864d 35f1da7
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Nov 25 11:22:34 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Nov 25 11:22:34 2015 -0800
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaUtils.java | 8 ++--
.../src/jvm/storm/kafka/KeyValueScheme.java | 5 +--
.../kafka/KeyValueSchemeAsMultiScheme.java | 5 ++-
.../jvm/storm/kafka/MessageMetadataScheme.java | 6 ++-
.../MessageMetadataSchemeAsMultiScheme.java | 3 +-
.../jvm/storm/kafka/StringKeyValueScheme.java | 3 +-
.../kafka/StringMessageAndMetadataScheme.java | 7 ++--
.../storm/kafka/StringMultiSchemeWithTopic.java | 21 +++-------
.../src/jvm/storm/kafka/StringScheme.java | 20 ++++++----
.../storm/kafka/StringKeyValueSchemeTest.java | 17 ++++++---
.../src/test/storm/kafka/TestStringScheme.java | 40 ++++++++++++++++++++
.../jvm/backtype/storm/spout/MultiScheme.java | 3 +-
.../backtype/storm/spout/RawMultiScheme.java | 3 +-
.../src/jvm/backtype/storm/spout/RawScheme.java | 9 ++++-
.../src/jvm/backtype/storm/spout/Scheme.java | 3 +-
.../storm/spout/SchemeAsMultiScheme.java | 3 +-
16 files changed, 106 insertions(+), 50 deletions(-)
----------------------------------------------------------------------