You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:33 UTC
[34/50] [abbrv] git commit: fixed KafkaSpout partition assignment
fixed KafkaSpout partition assignment
* added partitions -> task mapping logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2f45866c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2f45866c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2f45866c
Branch: refs/heads/master
Commit: 2f45866c8e011ac4804c940ff9e1d7c147591761
Parents: 09ae973
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Mon Mar 31 07:35:22 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Mon Mar 31 07:35:22 2014 +0100
----------------------------------------------------------------------
CHANGELOG.md | 2 +
src/jvm/storm/kafka/KafkaUtils.java | 31 +++++-
src/jvm/storm/kafka/StaticCoordinator.java | 7 +-
src/jvm/storm/kafka/ZkCoordinator.java | 37 ++++---
src/test/storm/kafka/KafkaUtilsTest.java | 39 +++++++
src/test/storm/kafka/TestUtils.java | 20 ++++
src/test/storm/kafka/ZkCoordinatorTest.java | 128 +++++++++++++++++++++++
7 files changed, 238 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 74ab824..133ffda 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+## 0.5.0
+* fixed partition assignment for KafkaSpout
## 0.4.0
* added support for reading kafka message keys
* configurable metrics emit interval
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index eed438f..4e8b3a3 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -2,6 +2,7 @@ package storm.kafka;
import backtype.storm.metric.api.IMetric;
import backtype.storm.utils.Utils;
+import com.google.common.base.Preconditions;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
@@ -13,6 +14,7 @@ import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.trident.IBrokerReader;
import storm.kafka.trident.StaticBrokerReader;
import storm.kafka.trident.ZkBrokerReader;
@@ -39,7 +41,7 @@ public class KafkaUtils {
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
- if ( config.forceFromStart ) {
+ if (config.forceFromStart) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
@@ -91,7 +93,7 @@ public class KafkaUtils {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
- long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
@@ -184,4 +186,29 @@ public class KafkaUtils {
return tups;
}
+
+ public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
+ Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
+ List<Partition> partitions = partitionInformation.getOrderedPartitions();
+ int numPartitions = partitions.size();
+ if (numPartitions < totalTasks) {
+ LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+ }
+ List<Partition> taskPartitions = new ArrayList<Partition>();
+ for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+ Partition taskPartition = partitions.get(i);
+ taskPartitions.add(taskPartition);
+ }
+ logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+ return taskPartitions;
+ }
+
+ private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
+ String taskPrefix = "[" + taskIndex + "/" + totalTasks + "] --> ";
+ if (taskPartitions.isEmpty()) {
+ LOG.warn(taskPrefix + "no partitions assigned");
+ } else {
+ LOG.info(taskPrefix + "assigned " + taskPartitions);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticCoordinator.java b/src/jvm/storm/kafka/StaticCoordinator.java
index 7415522..040060c 100644
--- a/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/src/jvm/storm/kafka/StaticCoordinator.java
@@ -12,13 +12,10 @@ public class StaticCoordinator implements PartitionCoordinator {
public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
- List<Partition> partitions = hosts.getPartitionInformation().getOrderedPartitions();
- for (int i = taskIndex; i < partitions.size(); i += totalTasks) {
- Partition myPartition = partitions.get(i);
+ List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
+ for (Partition myPartition : myPartitions) {
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
-
}
-
_allManagers = new ArrayList(_managers.values());
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index 98e51a3..35d2c57 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -1,6 +1,5 @@
package storm.kafka;
-import backtype.storm.task.IMetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;
@@ -22,9 +21,12 @@ public class ZkCoordinator implements PartitionCoordinator {
DynamicBrokersReader _reader;
ZkState _state;
Map _stormConf;
- IMetricsContext _metricsContext;
public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+ this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+ }
+
+ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
@@ -32,11 +34,14 @@ public class ZkCoordinator implements PartitionCoordinator {
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
_state = state;
-
ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
- _reader = new DynamicBrokersReader(stormConf, brokerConf.brokerZkStr, brokerConf.brokerZkPath, spoutConfig.topic);
+ _reader = reader;
+ }
+ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
+ ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
+ return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
}
@Override
@@ -50,14 +55,9 @@ public class ZkCoordinator implements PartitionCoordinator {
void refresh() {
try {
- LOG.info("Refreshing partition manager connections");
+ LOG.info(taskIdentifier() + "Refreshing partition manager connections");
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
- Set<Partition> mine = new HashSet();
- for (Partition partitionId : brokerInfo) {
- if (myOwnership(partitionId)) {
- mine.add(partitionId);
- }
- }
+ List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
Set<Partition> curr = _managers.keySet();
Set<Partition> newPartitions = new HashSet<Partition>(mine);
@@ -66,13 +66,13 @@ public class ZkCoordinator implements PartitionCoordinator {
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);
- LOG.info("Deleted partition managers: " + deletedPartitions.toString());
+ LOG.info(taskIdentifier() + "Deleted partition managers: " + deletedPartitions.toString());
for (Partition id : deletedPartitions) {
PartitionManager man = _managers.remove(id);
man.close();
}
- LOG.info("New partition managers: " + newPartitions.toString());
+ LOG.info(taskIdentifier() + "New partition managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
@@ -83,16 +83,15 @@ public class ZkCoordinator implements PartitionCoordinator {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
- LOG.info("Finished refreshing");
+ LOG.info(taskIdentifier() + "Finished refreshing");
+ }
+
+ private String taskIdentifier() {
+ return "[" + _taskIndex + "/" + _totalTasks + "] - ";
}
@Override
public PartitionManager getManager(Partition partition) {
return _managers.get(partition);
}
-
- private boolean myOwnership(Partition id) {
- int val = Math.abs(id.host.hashCode() + 23 * id.partition);
- return val % _totalTasks == _taskIndex;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 0763042..a4e7f52 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -178,4 +178,43 @@ public class KafkaUtilsTest {
Producer<String, String> producer = new Producer<String, String>(producerConfig);
producer.send(new KeyedMessage<String, String>(config.topic, key, value));
}
+
+
+ @Test
+ public void assignOnePartitionPerTask() {
+ runPartitionToTaskMappingTest(16, 1);
+ }
+
+ @Test
+ public void assignTwoPartitionsPerTask() {
+ runPartitionToTaskMappingTest(16, 2);
+ }
+
+ @Test
+ public void assignAllPartitionsToOneTask() {
+ runPartitionToTaskMappingTest(32, 32);
+ }
+
+
+ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) {
+ GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions);
+ int numTasks = numPartitions / partitionsPerTask;
+ for (int i = 0 ; i < numTasks ; i++) {
+ assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, i).size());
+ }
+ }
+
+ @Test
+ public void moreTasksThanPartitions() {
+ GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(1);
+ int numTasks = 2;
+ assertEquals(1, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, 0).size());
+ assertEquals(0, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, 1).size());
+ }
+
+ @Test (expected = IllegalArgumentException.class )
+ public void assignInvalidTask() {
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+ KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, 1, 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/TestUtils.java b/src/test/storm/kafka/TestUtils.java
new file mode 100644
index 0000000..860d96d
--- /dev/null
+++ b/src/test/storm/kafka/TestUtils.java
@@ -0,0 +1,20 @@
+package storm.kafka;
+
+import storm.kafka.trident.GlobalPartitionInformation;
+
+public class TestUtils {
+
+ public static GlobalPartitionInformation buildPartitionInfo(int numPartitions) {
+ return buildPartitionInfo(numPartitions, 9092);
+ }
+
+
+ public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+ for (int i = 0; i < numPartitions; i++) {
+ globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
+ }
+ return globalPartitionInformation;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/ZkCoordinatorTest.java b/src/test/storm/kafka/ZkCoordinatorTest.java
new file mode 100644
index 0000000..35b3b4b
--- /dev/null
+++ b/src/test/storm/kafka/ZkCoordinatorTest.java
@@ -0,0 +1,128 @@
+package storm.kafka;
+
+import backtype.storm.Config;
+import com.netflix.curator.test.TestingServer;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.when;
+
+public class ZkCoordinatorTest {
+
+
+ @Mock
+ private DynamicBrokersReader reader;
+
+ @Mock
+ private DynamicPartitionConnections dynamicPartitionConnections;
+
+ private KafkaTestBroker broker = new KafkaTestBroker();
+ private TestingServer server;
+ private Map stormConf = new HashMap();
+ private SpoutConfig spoutConfig;
+ private ZkState state;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ server = new TestingServer();
+ String connectionString = server.getConnectString();
+ ZkHosts hosts = new ZkHosts(connectionString);
+ hosts.refreshFreqSecs = 1;
+ spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id");
+ Map conf = buildZookeeperConfig(server);
+ state = new ZkState(conf);
+ SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
+ when(dynamicPartitionConnections.register(any(Broker.class), anyInt())).thenReturn(simpleConsumer);
+ }
+
+ private Map buildZookeeperConfig(TestingServer server) {
+ Map conf = new HashMap();
+ conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
+ conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
+ conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
+ return conf;
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ broker.shutdown();
+ server.stop();
+ }
+
+ @Test
+ public void testOnePartitionPerTask() throws Exception {
+ int totalTasks = 64;
+ int partitionsPerTask = 1;
+ List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks));
+ for (ZkCoordinator coordinator : coordinatorList) {
+ List<PartitionManager> myManagedPartitions = coordinator.getMyManagedPartitions();
+ assertEquals(partitionsPerTask, myManagedPartitions.size());
+ assertEquals(coordinator._taskIndex, myManagedPartitions.get(0).getPartition().partition);
+ }
+ }
+
+
+ @Test
+ public void testPartitionsChange() throws Exception {
+ final int totalTasks = 64;
+ int partitionsPerTask = 2;
+ List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9092));
+ List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
+ waitForRefresh();
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9093));
+ List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
+ assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
+ Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
+ for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
+ List<PartitionManager> partitionManagersAfter = iterator.next();
+ assertPartitionsAreDifferent(partitionManagersBefore, partitionManagersAfter, partitionsPerTask);
+ }
+ }
+
+ private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) {
+ assertEquals(partitionsPerTask, partitionManagersBefore.size());
+ assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());
+ for (int i = 0; i < partitionsPerTask; i++) {
+ assertNotEquals(partitionManagersBefore.get(i).getPartition(), partitionManagersAfter.get(i).getPartition());
+ }
+
+ }
+
+ private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) {
+ List<List<PartitionManager>> partitions = new ArrayList();
+ for (ZkCoordinator coordinator : coordinatorList) {
+ partitions.add(coordinator.getMyManagedPartitions());
+ }
+ return partitions;
+ }
+
+ private void waitForRefresh() throws InterruptedException {
+ Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1);
+ }
+
+ private List<ZkCoordinator> buildCoordinators(int totalTasks) {
+ List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
+ for (int i = 0; i < totalTasks; i++) {
+ ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
+ coordinatorList.add(coordinator);
+ }
+ return coordinatorList;
+ }
+
+
+}