You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:41 UTC

[02/14] storm git commit: STORM-697: Overload of generateTuples to accept the Partition and offset

STORM-697: Overload of generateTuples to accept the Partition and offset


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e4fde20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e4fde20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e4fde20

Branch: refs/heads/master
Commit: 6e4fde20af8d285cdf4829e4c2c4aef4cd45d89d
Parents: 5b4c28a
Author: matt.tieman <ma...@inin.com>
Authored: Tue Mar 3 11:47:38 2015 -0500
Committer: matt.tieman <ma...@inin.com>
Committed: Tue Mar 3 11:47:38 2015 -0500

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e4fde20/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..9af49fe 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -210,6 +210,21 @@ public class KafkaUtils {
         }
         return tups;
     }
+    
+    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, int offset) {
+        Iterable<List<Object>> tups;
+        ByteBuffer payload = msg.payload();
+        if (payload == null) {
+            return null;
+        }
+        
+        if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+            tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+        } else {
+            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+        }
+        return tups;
+    }
 
 
     public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {