You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:23 UTC

[16/17] storm git commit: STORM-817. Kafka wildcard topic support. Upmerge.

STORM-817. Kafka wildcard topic support. Upmerge.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c097329
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c097329
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c097329

Branch: refs/heads/master
Commit: 4c097329187412f75e80a657effa997e2656923f
Parents: fe88200
Author: Sumit Chawla <su...@gmail.com>
Authored: Sun Nov 1 19:27:42 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 2 14:01:48 2015 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +-
 external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java  | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4c097329/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index faf34d9..d1b9f48 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -145,7 +145,7 @@ public class PartitionManager {
             if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
                 tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
             } else {
-                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, partition.topic);
+                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition.topic);
             }
             
             if ((tups != null) && tups.iterator().hasNext()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4c097329/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 2b5c8f7..eb694bb 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
@@ -102,7 +103,7 @@ public class KafkaUtilsTest {
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
         config.useStartOffsetTimeIfOffsetOutOfRange = false;
         KafkaUtils.fetchMessages(config, simpleConsumer,
-                new Partition(Broker.fromString(broker.getBrokerConnectionString()),TEST_TOPIC, 0), -99);
+                new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99);
     }
 
     @Test(expected = TopicOffsetOutOfRangeException.class)
@@ -263,8 +264,7 @@ public class KafkaUtilsTest {
     public void assignAllPartitionsToOneTask() {
         runPartitionToTaskMappingTest(32, 32);
     }
-
-
+    
     public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) {
         GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions);
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();