You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:23 UTC
[16/17] storm git commit: STORM-817. Kafka wildcard topic support.
Upmerge.
STORM-817. Kafka wildcard topic support. Upmerge.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c097329
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c097329
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c097329
Branch: refs/heads/master
Commit: 4c097329187412f75e80a657effa997e2656923f
Parents: fe88200
Author: Sumit Chawla <su...@gmail.com>
Authored: Sun Nov 1 19:27:42 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 2 14:01:48 2015 -0800
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +-
external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4c097329/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index faf34d9..d1b9f48 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -145,7 +145,7 @@ public class PartitionManager {
if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
} else {
- tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, partition.topic);
+ tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition.topic);
}
if ((tups != null) && tups.iterator().hasNext()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4c097329/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 2b5c8f7..eb694bb 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -102,7 +103,7 @@ public class KafkaUtilsTest {
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
config.useStartOffsetTimeIfOffsetOutOfRange = false;
KafkaUtils.fetchMessages(config, simpleConsumer,
- new Partition(Broker.fromString(broker.getBrokerConnectionString()),TEST_TOPIC, 0), -99);
+ new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99);
}
@Test(expected = TopicOffsetOutOfRangeException.class)
@@ -263,8 +264,7 @@ public class KafkaUtilsTest {
public void assignAllPartitionsToOneTask() {
runPartitionToTaskMappingTest(32, 32);
}
-
-
+
public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) {
GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();