You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:23 UTC

[24/50] [abbrv] git commit: Added support for reading message key

Added support for reading message key


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4de85c8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4de85c8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4de85c8e

Branch: refs/heads/master
Commit: 4de85c8e7f6f4244be9eb688c6a4576a9c88c2a1
Parents: f8afa99
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Feb 23 12:48:56 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Feb 23 12:48:56 2014 +0000

----------------------------------------------------------------------
 src/jvm/storm/kafka/KafkaUtils.java             | 22 ++++--
 src/jvm/storm/kafka/KeyValueScheme.java         | 11 +++
 .../kafka/KeyValueSchemeAsMultiScheme.java      | 19 +++++
 src/jvm/storm/kafka/PartitionManager.java       |  2 +-
 src/jvm/storm/kafka/StringKeyValueScheme.java   | 20 ++++++
 src/jvm/storm/kafka/StringScheme.java           | 10 ++-
 .../kafka/trident/TridentKafkaEmitter.java      |  3 +-
 src/test/storm/kafka/KafkaUtilsTest.java        | 76 ++++++++++++++++++--
 .../storm/kafka/StringKeyValueSchemeTest.java   | 38 ++++++++++
 9 files changed, 187 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index d86519c..300d998 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -1,6 +1,7 @@
 package storm.kafka;
 
 import backtype.storm.metric.api.IMetric;
+import backtype.storm.utils.Utils;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -9,6 +10,7 @@ import kafka.javaapi.FetchResponse;
 import kafka.javaapi.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.IBrokerReader;
@@ -16,10 +18,8 @@ import storm.kafka.trident.StaticBrokerReader;
 import storm.kafka.trident.ZkBrokerReader;
 
 import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 
 public class KafkaUtils {
@@ -165,4 +165,18 @@ public class KafkaUtils {
         }
         return msgs;
     }
+
+
+    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
+        Iterable<List<Object>> tups;
+        ByteBuffer payload = msg.payload();
+        ByteBuffer key = msg.key();
+        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
+            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
+        } else {
+            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+        }
+        return tups;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KeyValueScheme.java b/src/jvm/storm/kafka/KeyValueScheme.java
new file mode 100644
index 0000000..df31cb8
--- /dev/null
+++ b/src/jvm/storm/kafka/KeyValueScheme.java
@@ -0,0 +1,11 @@
+package storm.kafka;
+
+import backtype.storm.spout.Scheme;
+
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+
+    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
new file mode 100644
index 0000000..2412a1c
--- /dev/null
+++ b/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -0,0 +1,19 @@
+package storm.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import java.util.Arrays;
+import java.util.List;
+
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
+
+    public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
+        super(scheme);
+    }
+
+    public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
+        List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
+        if(o == null) return null;
+        else return Arrays.asList(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index f12c0d9..fc9d817 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -112,7 +112,7 @@ public class PartitionManager {
             if (toEmit == null) {
                 return EmitState.NO_EMITTED;
             }
-            Iterable<List<Object>> tups = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.msg.payload()));
+            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
             if (tups != null) {
                 for (List<Object> tup : tups) {
                     collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StringKeyValueScheme.java b/src/jvm/storm/kafka/StringKeyValueScheme.java
new file mode 100644
index 0000000..a6adddb
--- /dev/null
+++ b/src/jvm/storm/kafka/StringKeyValueScheme.java
@@ -0,0 +1,20 @@
+package storm.kafka;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+
+public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
+
+    @Override
+    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+        if ( key == null ) {
+            return deserialize(value);
+        }
+        String keyString = StringScheme.deserializeString(key);
+        String valueString = StringScheme.deserializeString(value);
+        return new Values(ImmutableMap.of(keyString, valueString));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StringScheme.java b/src/jvm/storm/kafka/StringScheme.java
index c4d8270..a809448 100644
--- a/src/jvm/storm/kafka/StringScheme.java
+++ b/src/jvm/storm/kafka/StringScheme.java
@@ -9,15 +9,21 @@ import java.util.List;
 
 public class StringScheme implements Scheme {
 
+    public static final String STRING_SCHEME_KEY = "str";
+
     public List<Object> deserialize(byte[] bytes) {
+        return new Values(deserializeString(bytes));
+    }
+
+    public static String deserializeString(byte[] string) {
         try {
-            return new Values(new String(bytes, "UTF-8"));
+            return new String(string, "UTF-8");
         } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
         }
     }
 
     public Fields getOutputFields() {
-        return new Fields("str");
+        return new Fields(STRING_SCHEME_KEY);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 66785f0..973ce8f 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -155,8 +155,7 @@ public class TridentKafkaEmitter {
     }
 
     private void emit(TridentCollector collector, Message msg) {
-        Iterable<List<Object>> values =
-                _config.scheme.deserialize(Utils.toByteArray(msg.payload()));
+        Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, msg);
         if (values != null) {
             for (List<Object> value : values) {
                 collector.emit(value);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 20a4221..db270c2 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -1,10 +1,13 @@
 package storm.kafka;
 
+import backtype.storm.spout.SchemeAsMultiScheme;
 import backtype.storm.utils.Utils;
+import com.google.common.collect.ImmutableMap;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.javaapi.producer.Producer;
+import kafka.message.MessageAndOffset;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import org.junit.After;
@@ -12,10 +15,12 @@ import org.junit.Before;
 import org.junit.Test;
 import storm.kafka.trident.GlobalPartitionInformation;
 
+import java.util.List;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 public class KafkaUtilsTest {
@@ -90,19 +95,80 @@ public class KafkaUtilsTest {
         assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
     }
 
-    private String createTopicAndSendMessage() {
+    @Test
+    public void generateTuplesWithoutKeyAndKeyValueScheme() {
+        config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
+        runGetValueOnlyTuplesTest();
+    }
+
+    @Test
+    public void generateTuplesWithKeyAndKeyValueScheme() {
+        config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
+        String value = "value";
+        String key = "key";
+        createTopicAndSendMessage(key, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+            assertEquals(ImmutableMap.of(key, value), lists.iterator().next().get(0));
+        }
+    }
+
+    @Test
+    public void generateTupelsWithValueScheme() {
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+        runGetValueOnlyTuplesTest();
+    }
+
+    @Test
+    public void generateTuplesWithValueSchemeAndKeyValueMessage() {
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+        String value = "value";
+        String key = "key";
+        createTopicAndSendMessage(key, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+            assertEquals(value, lists.iterator().next().get(0));
+        }
+    }
+
+    private ByteBufferMessageSet getLastMessage() {
+        long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
+        return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offsetOfLastMessage);
+    }
+
+    private void runGetValueOnlyTuplesTest() {
+        String value = "value";
+        createTopicAndSendMessage(null, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message());
+            assertEquals(value, lists.iterator().next().get(0));
+        }
+    }
+
+
+    private void createTopicAndSendMessage() {
+        createTopicAndSendMessage(null, "someValue");
+    }
+
+    private void createTopicAndSendMessage(String value) {
+        createTopicAndSendMessage(null, value);
+    }
+
+    private void createTopicAndSendMessage(String key, String value) {
         Properties p = new Properties();
         p.setProperty("metadata.broker.list", "localhost:49123");
         p.setProperty("serializer.class", "kafka.serializer.StringEncoder");
         ProducerConfig producerConfig = new ProducerConfig(p);
         Producer<String, String> producer = new Producer<String, String>(producerConfig);
-        String value = "value";
-        producer.send(new KeyedMessage<String, String>(config.topic, value));
-        return value;
+        producer.send(new KeyedMessage<String, String>(config.topic, key, value));
     }
 
     private void sendMessageAndAssertValueForOffset(long offset) {
-        String value = createTopicAndSendMessage();
+        String value = "test";
+        createTopicAndSendMessage(value);
         ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
         String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
         assertThat(message, is(equalTo(value)));

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4de85c8e/src/test/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/StringKeyValueSchemeTest.java b/src/test/storm/kafka/StringKeyValueSchemeTest.java
new file mode 100644
index 0000000..4413c7b
--- /dev/null
+++ b/src/test/storm/kafka/StringKeyValueSchemeTest.java
@@ -0,0 +1,38 @@
+package storm.kafka;
+
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StringKeyValueSchemeTest {
+
+    private StringKeyValueScheme scheme = new StringKeyValueScheme();
+
+    @Test
+    public void testDeserialize() throws Exception {
+        assertEquals(Arrays.asList("test"), scheme.deserialize("test".getBytes()));
+    }
+
+    @Test
+    public void testGetOutputFields() throws Exception {
+        Fields outputFields = scheme.getOutputFields();
+        assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY));
+        assertEquals(1, outputFields.size());
+    }
+
+    @Test
+    public void testDeserializeWithNullKeyAndValue() throws Exception {
+        assertEquals(Arrays.asList("test"), scheme.deserializeKeyAndValue(null, "test".getBytes()));
+    }
+
+    @Test
+    public void testDeserializeWithKeyAndValue() throws Exception {
+        assertEquals(Arrays.asList(ImmutableMap.of("key", "test")),
+                scheme.deserializeKeyAndValue("key".getBytes(), "test".getBytes()));
+    }
+}