You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:49 UTC
[10/14] storm git commit: STORM-697: Review feedback: Removed
tupleMetaData from SpoutConfig,
used instanceof check on spout scheme to determine if tuples should be
generated with meta data
STORM-697: Review feedback: Removed tupleMetaData from SpoutConfig, used instanceof check on spout scheme to determine if tuples should be generated with meta data
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d79d9b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d79d9b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d79d9b3
Branch: refs/heads/master
Commit: 4d79d9b3b66bd381d6654e68fb418d2efad8e922
Parents: 47f4a3c
Author: Matt Tieman <ma...@inin.com>
Authored: Fri Oct 23 23:47:13 2015 -0400
Committer: Matt Tieman <ma...@inin.com>
Committed: Fri Oct 23 23:47:13 2015 -0400
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaConfig.java | 4 ++--
.../src/jvm/storm/kafka/KafkaUtils.java | 11 ++-------
.../MessageMetadataSchemeAsMultiScheme.java | 1 -
.../src/jvm/storm/kafka/PartitionManager.java | 8 +++++--
.../kafka/trident/TridentKafkaEmitter.java | 4 ++--
.../src/test/storm/kafka/KafkaUtilsTest.java | 25 ++------------------
6 files changed, 14 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index ea0e421..49c7526 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -23,7 +23,8 @@ import backtype.storm.spout.RawMultiScheme;
import java.io.Serializable;
public class KafkaConfig implements Serializable {
-
+ private static final long serialVersionUID = 5276718734571623855L;
+
public final BrokerHosts hosts;
public final String topic;
public final String clientId;
@@ -38,7 +39,6 @@ public class KafkaConfig implements Serializable {
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
- public boolean tupleMetaData = false; // True to generate tuples from MessageAndRealOffset, false to generate only from the message
public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 340f629..2e047b3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -208,19 +208,12 @@ public class KafkaUtils {
return tups;
}
- public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, long offset) {
- Iterable<List<Object>> tups;
+ public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
ByteBuffer payload = msg.payload();
if (payload == null) {
return null;
}
-
- if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
- tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
- } else {
- tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
- }
- return tups;
+ return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index dcdbf8b..e89e391 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -12,7 +12,6 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
super(scheme);
}
- @SuppressWarnings("unchecked")
public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) {
List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
if (o == null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 980ed8b..39e42ed 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,12 +23,16 @@ import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
+
import com.google.common.collect.ImmutableMap;
+
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import storm.kafka.KafkaSpout.EmitState;
import storm.kafka.KafkaSpout.MessageAndRealOffset;
import storm.kafka.trident.MaxMetric;
@@ -137,8 +141,8 @@ public class PartitionManager {
}
Iterable<List<Object>> tups;
- if (_spoutConfig.tupleMetaData) {
- tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition, toEmit.offset);
+ if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
} else {
tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 5ac5709..39aac1a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -181,8 +181,8 @@ public class TridentKafkaEmitter {
private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
Iterable<List<Object>> values;
- if (_config.tupleMetaData) {
- values = KafkaUtils.generateTuples(_config, msg, partition, offset);
+ if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
} else {
values = KafkaUtils.generateTuples(_config, msg);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 8f4343e..65e8d2b 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -180,39 +180,18 @@ public class KafkaUtilsTest {
mockPartition.partition = 0;
long offset = 0L;
- config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
- config.tupleMetaData = true;
+ MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
createTopicAndSendMessage(null, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
- Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
+ Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset);
List<Object> values = lists.iterator().next();
assertEquals("Message is incorrect", value, values.get(0));
assertEquals("Partition is incorrect", mockPartition.partition, values.get(1));
assertEquals("Offset is incorrect", offset, values.get(2));
}
}
-
- @Test
- public void generateTuplesWithValueSchemeAndMessageAndMetadata() {
- String value = "value";
- Partition mockPartition = Mockito.mock(Partition.class);
- mockPartition.partition = 0;
- Long offset = 0L;
-
- config.scheme = new SchemeAsMultiScheme(new StringScheme());
- config.tupleMetaData = true;
-
- createTopicAndSendMessage(null, value);
- ByteBufferMessageSet messageAndOffsets = getLastMessage();
- for (MessageAndOffset msg : messageAndOffsets) {
- Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset);
- List<Object> values = lists.iterator().next();
- assertEquals("Incorrect number of tuple values", 1, values.size());
- assertEquals("Message is incorrect", value, values.get(0));
- }
- }
private ByteBufferMessageSet getLastMessage() {
long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;