You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:41 UTC
[02/14] storm git commit: STORM-697: Overload of generateTuples to
accept the Partition and offset
STORM-697: Overload of generateTuples to accept the Partition and offset
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e4fde20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e4fde20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e4fde20
Branch: refs/heads/master
Commit: 6e4fde20af8d285cdf4829e4c2c4aef4cd45d89d
Parents: 5b4c28a
Author: matt.tieman <ma...@inin.com>
Authored: Tue Mar 3 11:47:38 2015 -0500
Committer: matt.tieman <ma...@inin.com>
Committed: Tue Mar 3 11:47:38 2015 -0500
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e4fde20/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..9af49fe 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -210,6 +210,21 @@ public class KafkaUtils {
}
return tups;
}
+
+ public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, int offset) {
+ Iterable<List<Object>> tups;
+ ByteBuffer payload = msg.payload();
+ if (payload == null) {
+ return null;
+ }
+
+ if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+ } else {
+ tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+ }
+ return tups;
+ }
public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {