You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:49 UTC

[10/14] storm git commit: STORM-697: Review feedback: Removed tupleMetaData from SpoutConfig, used instanceof check on spout scheme to determine if tuples should be generated with meta data

STORM-697: Review feedback: Removed tupleMetaData from SpoutConfig, used instanceof check on spout scheme to determine if tuples should be generated with meta data


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d79d9b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d79d9b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d79d9b3

Branch: refs/heads/master
Commit: 4d79d9b3b66bd381d6654e68fb418d2efad8e922
Parents: 47f4a3c
Author: Matt Tieman <ma...@inin.com>
Authored: Fri Oct 23 23:47:13 2015 -0400
Committer: Matt Tieman <ma...@inin.com>
Committed: Fri Oct 23 23:47:13 2015 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaConfig.java        |  4 ++--
 .../src/jvm/storm/kafka/KafkaUtils.java         | 11 ++-------
 .../MessageMetadataSchemeAsMultiScheme.java     |  1 -
 .../src/jvm/storm/kafka/PartitionManager.java   |  8 +++++--
 .../kafka/trident/TridentKafkaEmitter.java      |  4 ++--
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 25 ++------------------
 6 files changed, 14 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index ea0e421..49c7526 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -23,7 +23,8 @@ import backtype.storm.spout.RawMultiScheme;
 import java.io.Serializable;
 
 public class KafkaConfig implements Serializable {
-
+    private static final long serialVersionUID = 5276718734571623855L;
+    
     public final BrokerHosts hosts;
     public final String topic;
     public final String clientId;
@@ -38,7 +39,6 @@ public class KafkaConfig implements Serializable {
     public long maxOffsetBehind = Long.MAX_VALUE;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
     public int metricsTimeBucketSizeInSecs = 60;
-    public boolean tupleMetaData = false; // True to generate tuples from MessageAndRealOffset, false to generate only from the message
 
     public KafkaConfig(BrokerHosts hosts, String topic) {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());

http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 340f629..2e047b3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -208,19 +208,12 @@ public class KafkaUtils {
         return tups;
     }
     
-    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, long offset) {
-        Iterable<List<Object>> tups;
+    public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
         ByteBuffer payload = msg.payload();
         if (payload == null) {
             return null;
         }
-        
-        if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
-            tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
-        } else {
-            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
-        }
-        return tups;
+        return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index dcdbf8b..e89e391 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -12,7 +12,6 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
         super(scheme);
     }
 
-    @SuppressWarnings("unchecked")
     public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
         List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
         if (o == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 980ed8b..39e42ed 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,12 +23,16 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
+
 import com.google.common.collect.ImmutableMap;
+
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import storm.kafka.KafkaSpout.EmitState;
 import storm.kafka.KafkaSpout.MessageAndRealOffset;
 import storm.kafka.trident.MaxMetric;
@@ -137,8 +141,8 @@ public class PartitionManager {
             }
 
             Iterable<List<Object>> tups;
-            if (_spoutConfig.tupleMetaData) {
-                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition, toEmit.offset);
+            if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+                tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
             } else {
                 tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 5ac5709..39aac1a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -181,8 +181,8 @@ public class TridentKafkaEmitter {
 
     private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
         Iterable<List<Object>> values;
-        if (_config.tupleMetaData) {
-            values = KafkaUtils.generateTuples(_config, msg, partition, offset);
+        if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+            values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
         } else {
             values = KafkaUtils.generateTuples(_config, msg);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 8f4343e..65e8d2b 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -180,39 +180,18 @@ public class KafkaUtilsTest {
         mockPartition.partition = 0;
         long offset = 0L;
         
-        config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
-        config.tupleMetaData = true;
+        MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
         
         createTopicAndSendMessage(null, value);
         ByteBufferMessageSet messageAndOffsets = getLastMessage();
         for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset);
             List<Object> values = lists.iterator().next(); 
             assertEquals("Message is incorrect", value, values.get(0));
             assertEquals("Partition is incorrect", mockPartition.partition, values.get(1));
             assertEquals("Offset is incorrect", offset, values.get(2));
         }
     }
-    
-    @Test
-    public void generateTuplesWithValueSchemeAndMessageAndMetadata() {
-        String value = "value";
-        Partition mockPartition = Mockito.mock(Partition.class);
-        mockPartition.partition = 0;
-        Long offset = 0L;
-        
-        config.scheme = new SchemeAsMultiScheme(new StringScheme());
-        config.tupleMetaData = true;
-        
-        createTopicAndSendMessage(null, value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
-            List<Object> values = lists.iterator().next();
-            assertEquals("Incorrect number of tuple values", 1, values.size());
-            assertEquals("Message is incorrect", value, values.get(0));
-        }
-    }
 
     private ByteBufferMessageSet getLastMessage() {
         long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;