You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:23 UTC
[24/50] [abbrv] git commit: Added support for reading message key
Added support for reading message key
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4de85c8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4de85c8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4de85c8e
Branch: refs/heads/master
Commit: 4de85c8e7f6f4244be9eb688c6a4576a9c88c2a1
Parents: f8afa99
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Feb 23 12:48:56 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Feb 23 12:48:56 2014 +0000
----------------------------------------------------------------------
src/jvm/storm/kafka/KafkaUtils.java | 22 ++++--
src/jvm/storm/kafka/KeyValueScheme.java | 11 +++
.../kafka/KeyValueSchemeAsMultiScheme.java | 19 +++++
src/jvm/storm/kafka/PartitionManager.java | 2 +-
src/jvm/storm/kafka/StringKeyValueScheme.java | 20 ++++++
src/jvm/storm/kafka/StringScheme.java | 10 ++-
.../kafka/trident/TridentKafkaEmitter.java | 3 +-
src/test/storm/kafka/KafkaUtilsTest.java | 76 ++++++++++++++++++--
.../storm/kafka/StringKeyValueSchemeTest.java | 38 ++++++++++
9 files changed, 187 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index d86519c..300d998 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -1,6 +1,7 @@
package storm.kafka;
import backtype.storm.metric.api.IMetric;
+import backtype.storm.utils.Utils;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
@@ -9,6 +10,7 @@ import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.IBrokerReader;
@@ -16,10 +18,8 @@ import storm.kafka.trident.StaticBrokerReader;
import storm.kafka.trident.ZkBrokerReader;
import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.nio.ByteBuffer;
+import java.util.*;
public class KafkaUtils {
@@ -165,4 +165,18 @@ public class KafkaUtils {
}
return msgs;
}
+
+
+ public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
+ Iterable<List<Object>> tups;
+ ByteBuffer payload = msg.payload();
+ ByteBuffer key = msg.key();
+ if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
+ tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
+ } else {
+ tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+ }
+ return tups;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KeyValueScheme.java b/src/jvm/storm/kafka/KeyValueScheme.java
new file mode 100644
index 0000000..df31cb8
--- /dev/null
+++ b/src/jvm/storm/kafka/KeyValueScheme.java
@@ -0,0 +1,11 @@
+package storm.kafka;
+
+import backtype.storm.spout.Scheme;
+
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+
+ public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
new file mode 100644
index 0000000..2412a1c
--- /dev/null
+++ b/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -0,0 +1,19 @@
+package storm.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import java.util.Arrays;
+import java.util.List;
+
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
+
+ public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
+ super(scheme);
+ }
+
+ public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
+ List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
+ if(o == null) return null;
+ else return Arrays.asList(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index f12c0d9..fc9d817 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -112,7 +112,7 @@ public class PartitionManager {
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
- Iterable<List<Object>> tups = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.msg.payload()));
+ Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StringKeyValueScheme.java b/src/jvm/storm/kafka/StringKeyValueScheme.java
new file mode 100644
index 0000000..a6adddb
--- /dev/null
+++ b/src/jvm/storm/kafka/StringKeyValueScheme.java
@@ -0,0 +1,20 @@
+package storm.kafka;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+
+public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
+
+ @Override
+ public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+ if ( key == null ) {
+ return deserialize(value);
+ }
+ String keyString = StringScheme.deserializeString(key);
+ String valueString = StringScheme.deserializeString(value);
+ return new Values(ImmutableMap.of(keyString, valueString));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StringScheme.java b/src/jvm/storm/kafka/StringScheme.java
index c4d8270..a809448 100644
--- a/src/jvm/storm/kafka/StringScheme.java
+++ b/src/jvm/storm/kafka/StringScheme.java
@@ -9,15 +9,21 @@ import java.util.List;
public class StringScheme implements Scheme {
+ public static final String STRING_SCHEME_KEY = "str";
+
public List<Object> deserialize(byte[] bytes) {
+ return new Values(deserializeString(bytes));
+ }
+
+ public static String deserializeString(byte[] string) {
try {
- return new Values(new String(bytes, "UTF-8"));
+ return new String(string, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public Fields getOutputFields() {
- return new Fields("str");
+ return new Fields(STRING_SCHEME_KEY);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 66785f0..973ce8f 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -155,8 +155,7 @@ public class TridentKafkaEmitter {
}
private void emit(TridentCollector collector, Message msg) {
- Iterable<List<Object>> values =
- _config.scheme.deserialize(Utils.toByteArray(msg.payload()));
+ Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, msg);
if (values != null) {
for (List<Object> value : values) {
collector.emit(value);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 20a4221..db270c2 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -1,10 +1,13 @@
package storm.kafka;
+import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.utils.Utils;
+import com.google.common.collect.ImmutableMap;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.javaapi.producer.Producer;
+import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.junit.After;
@@ -12,10 +15,12 @@ import org.junit.Before;
import org.junit.Test;
import storm.kafka.trident.GlobalPartitionInformation;
+import java.util.List;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class KafkaUtilsTest {
@@ -90,19 +95,80 @@ public class KafkaUtilsTest {
assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
}
- private String createTopicAndSendMessage() {
+ @Test
+ public void generateTuplesWithoutKeyAndKeyValueScheme() {
+ config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
+ runGetValueOnlyTuplesTest();
+ }
+
+ @Test
+ public void generateTuplesWithKeyAndKeyValueScheme() {
+ config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
+ String value = "value";
+ String key = "key";
+ createTopicAndSendMessage(key, value);
+ ByteBufferMessageSet messageAndOffsets = getLastMessage();
+ for (MessageAndOffset msg : messageAndOffsets) {
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+ assertEquals(ImmutableMap.of(key, value), lists.iterator().next().get(0));
+ }
+ }
+
+ @Test
+ public void generateTupelsWithValueScheme() {
+ config.scheme = new SchemeAsMultiScheme(new StringScheme());
+ runGetValueOnlyTuplesTest();
+ }
+
+ @Test
+ public void generateTuplesWithValueSchemeAndKeyValueMessage() {
+ config.scheme = new SchemeAsMultiScheme(new StringScheme());
+ String value = "value";
+ String key = "key";
+ createTopicAndSendMessage(key, value);
+ ByteBufferMessageSet messageAndOffsets = getLastMessage();
+ for (MessageAndOffset msg : messageAndOffsets) {
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+ assertEquals(value, lists.iterator().next().get(0));
+ }
+ }
+
+ private ByteBufferMessageSet getLastMessage() {
+ long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
+ return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offsetOfLastMessage);
+ }
+
+ private void runGetValueOnlyTuplesTest() {
+ String value = "value";
+ createTopicAndSendMessage(null, value);
+ ByteBufferMessageSet messageAndOffsets = getLastMessage();
+ for (MessageAndOffset msg : messageAndOffsets) {
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+ assertEquals(value, lists.iterator().next().get(0));
+ }
+ }
+
+
+ private void createTopicAndSendMessage() {
+ createTopicAndSendMessage(null, "someValue");
+ }
+
+ private void createTopicAndSendMessage(String value) {
+ createTopicAndSendMessage(null, value);
+ }
+
+ private void createTopicAndSendMessage(String key, String value) {
Properties p = new Properties();
p.setProperty("metadata.broker.list", "localhost:49123");
p.setProperty("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(p);
Producer<String, String> producer = new Producer<String, String>(producerConfig);
- String value = "value";
- producer.send(new KeyedMessage<String, String>(config.topic, value));
- return value;
+ producer.send(new KeyedMessage<String, String>(config.topic, key, value));
}
private void sendMessageAndAssertValueForOffset(long offset) {
- String value = createTopicAndSendMessage();
+ String value = "test";
+ createTopicAndSendMessage(value);
ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
assertThat(message, is(equalTo(value)));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/test/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/StringKeyValueSchemeTest.java b/src/test/storm/kafka/StringKeyValueSchemeTest.java
new file mode 100644
index 0000000..4413c7b
--- /dev/null
+++ b/src/test/storm/kafka/StringKeyValueSchemeTest.java
@@ -0,0 +1,38 @@
+package storm.kafka;
+
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StringKeyValueSchemeTest {
+
+ private StringKeyValueScheme scheme = new StringKeyValueScheme();
+
+ @Test
+ public void testDeserialize() throws Exception {
+ assertEquals(Arrays.asList("test"), scheme.deserialize("test".getBytes()));
+ }
+
+ @Test
+ public void testGetOutputFields() throws Exception {
+ Fields outputFields = scheme.getOutputFields();
+ assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY));
+ assertEquals(1, outputFields.size());
+ }
+
+ @Test
+ public void testDeserializeWithNullKeyAndValue() throws Exception {
+ assertEquals(Arrays.asList("test"), scheme.deserializeKeyAndValue(null, "test".getBytes()));
+ }
+
+ @Test
+ public void testDeserializeWithKeyAndValue() throws Exception {
+ assertEquals(Arrays.asList(ImmutableMap.of("key", "test")),
+ scheme.deserializeKeyAndValue("key".getBytes(), "test".getBytes()));
+ }
+}