You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:42 UTC
[03/14] storm git commit: STORM-697: Test for
MessageMetadataSchemeAsMultiScheme and generateTuples with metadata using
SchemeAsMultiScheme
STORM-697: Test for MessageMetadataSchemeAsMultiScheme and generateTuples with metadata using SchemeAsMultiScheme
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e768665
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e768665
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e768665
Branch: refs/heads/master
Commit: 6e768665320d08815c53f27e706ef2ae1ff5af78
Parents: 6e4fde2
Author: matt.tieman <ma...@inin.com>
Authored: Tue Mar 3 11:48:57 2015 -0500
Committer: matt.tieman <ma...@inin.com>
Committed: Tue Mar 3 11:48:57 2015 -0500
----------------------------------------------------------------------
.../src/test/storm/kafka/KafkaUtilsTest.java | 66 +++++++++++++++++---
1 file changed, 56 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e768665/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 1f1bbbc..a7c9b2b 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -17,9 +17,14 @@
*/
package storm.kafka;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.utils.Utils;
-import com.google.common.collect.ImmutableMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.List;
+import java.util.Properties;
+
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
@@ -27,18 +32,17 @@ import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import storm.kafka.trident.GlobalPartitionInformation;
+import org.mockito.Mockito;
-import java.util.List;
-import java.util.Properties;
+import storm.kafka.trident.GlobalPartitionInformation;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.utils.Utils;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import com.google.common.collect.ImmutableMap;
public class KafkaUtilsTest {
@@ -166,6 +170,47 @@ public class KafkaUtilsTest {
assertEquals(value, lists.iterator().next().get(0));
}
}
+
+ @Test
+ public void generateTuplesWithMessageAndMetadataScheme() {
+ String value = "value";
+ Partition mockPartition = Mockito.mock(Partition.class);
+ mockPartition.partition = 0;
+ int offset = 0;
+
+ config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
+ config.tupleMetaData = true;
+
+ createTopicAndSendMessage(null, value);
+ ByteBufferMessageSet messageAndOffsets = getLastMessage();
+ for (MessageAndOffset msg : messageAndOffsets) {
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+ List<Object> values = lists.iterator().next();
+ assertEquals("Message is incorrect", value, values.get(0));
+ assertEquals("Offset is incorrect", offset, values.get(1));
+ assertEquals("Partition is incorrect", mockPartition.partition, values.get(2));
+ }
+ }
+
+ @Test
+ public void generateTuplesWithValueSchemeAndMessageAndMetadata() {
+ String value = "value";
+ Partition mockPartition = Mockito.mock(Partition.class);
+ mockPartition.partition = 0;
+ int offset = 0;
+
+ config.scheme = new SchemeAsMultiScheme(new StringScheme());
+ config.tupleMetaData = true;
+
+ createTopicAndSendMessage(null, value);
+ ByteBufferMessageSet messageAndOffsets = getLastMessage();
+ for (MessageAndOffset msg : messageAndOffsets) {
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+ List<Object> values = lists.iterator().next();
+ assertEquals("Incorrect number of tuple values", 1, values.size());
+ assertEquals("Message is incorrect", value, values.get(0));
+ }
+ }
private ByteBufferMessageSet getLastMessage() {
long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
@@ -174,6 +219,7 @@ public class KafkaUtilsTest {
private void runGetValueOnlyTuplesTest() {
String value = "value";
+
createTopicAndSendMessage(null, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {