You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:42 UTC

[03/14] storm git commit: STORM-697: Test for MessageMetadataSchemeAsMultiScheme and generateTuples with metadata using SchemeAsMultiScheme

STORM-697: Test for MessageMetadataSchemeAsMultiScheme and generateTuples with metadata using SchemeAsMultiScheme


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e768665
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e768665
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e768665

Branch: refs/heads/master
Commit: 6e768665320d08815c53f27e706ef2ae1ff5af78
Parents: 6e4fde2
Author: matt.tieman <ma...@inin.com>
Authored: Tue Mar 3 11:48:57 2015 -0500
Committer: matt.tieman <ma...@inin.com>
Committed: Tue Mar 3 11:48:57 2015 -0500

----------------------------------------------------------------------
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 66 +++++++++++++++++---
 1 file changed, 56 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e768665/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 1f1bbbc..a7c9b2b 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -17,9 +17,14 @@
  */
 package storm.kafka;
 
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.utils.Utils;
-import com.google.common.collect.ImmutableMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.List;
+import java.util.Properties;
+
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -27,18 +32,17 @@ import kafka.javaapi.producer.Producer;
 import kafka.message.MessageAndOffset;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import storm.kafka.trident.GlobalPartitionInformation;
+import org.mockito.Mockito;
 
-import java.util.List;
-import java.util.Properties;
+import storm.kafka.trident.GlobalPartitionInformation;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.utils.Utils;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import com.google.common.collect.ImmutableMap;
 
 public class KafkaUtilsTest {
 
@@ -166,6 +170,47 @@ public class KafkaUtilsTest {
             assertEquals(value, lists.iterator().next().get(0));
         }
     }
+    
+    @Test
+    public void generateTuplesWithMessageAndMetadataScheme() {
+        String value = "value";
+        Partition mockPartition = Mockito.mock(Partition.class);
+        mockPartition.partition = 0;
+        int offset = 0;
+        
+        config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
+        config.tupleMetaData = true;
+        
+        createTopicAndSendMessage(null, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+            List<Object> values = lists.iterator().next(); 
+            assertEquals("Message is incorrect", value, values.get(0));
+            assertEquals("Offset is incorrect", offset, values.get(1));
+            assertEquals("Partition is incorrect", mockPartition.partition, values.get(2));
+        }
+    }
+    
+    @Test
+    public void generateTuplesWithValueSchemeAndMessageAndMetadata() {
+        String value = "value";
+        Partition mockPartition = Mockito.mock(Partition.class);
+        mockPartition.partition = 0;
+        int offset = 0;
+        
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+        config.tupleMetaData = true;
+        
+        createTopicAndSendMessage(null, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+            List<Object> values = lists.iterator().next();
+            assertEquals("Incorrect number of tuple values", 1, values.size());
+            assertEquals("Message is incorrect", value, values.get(0));
+        }
+    }
 
     private ByteBufferMessageSet getLastMessage() {
         long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
@@ -174,6 +219,7 @@ public class KafkaUtilsTest {
 
     private void runGetValueOnlyTuplesTest() {
         String value = "value";
+        
         createTopicAndSendMessage(null, value);
         ByteBufferMessageSet messageAndOffsets = getLastMessage();
         for (MessageAndOffset msg : messageAndOffsets) {