You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:33 UTC

[34/50] [abbrv] git commit: fixed KafkaSpout partition assignment

fixed KafkaSpout partition assignment

* added partitions -> task mapping logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2f45866c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2f45866c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2f45866c

Branch: refs/heads/master
Commit: 2f45866c8e011ac4804c940ff9e1d7c147591761
Parents: 09ae973
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Mon Mar 31 07:35:22 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Mon Mar 31 07:35:22 2014 +0100

----------------------------------------------------------------------
 CHANGELOG.md                                |   2 +
 src/jvm/storm/kafka/KafkaUtils.java         |  31 +++++-
 src/jvm/storm/kafka/StaticCoordinator.java  |   7 +-
 src/jvm/storm/kafka/ZkCoordinator.java      |  37 ++++---
 src/test/storm/kafka/KafkaUtilsTest.java    |  39 +++++++
 src/test/storm/kafka/TestUtils.java         |  20 ++++
 src/test/storm/kafka/ZkCoordinatorTest.java | 128 +++++++++++++++++++++++
 7 files changed, 238 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 74ab824..133ffda 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+## 0.5.0
+* fixed partition assignment for KafkaSpout
 ## 0.4.0
 * added support for reading kafka message keys
 * configurable metrics emit interval

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index eed438f..4e8b3a3 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -2,6 +2,7 @@ package storm.kafka;
 
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.utils.Utils;
+import com.google.common.base.Preconditions;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -13,6 +14,7 @@ import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import storm.kafka.trident.GlobalPartitionInformation;
 import storm.kafka.trident.IBrokerReader;
 import storm.kafka.trident.StaticBrokerReader;
 import storm.kafka.trident.ZkBrokerReader;
@@ -39,7 +41,7 @@ public class KafkaUtils {
 
     public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-        if ( config.forceFromStart ) {
+        if (config.forceFromStart) {
             startOffsetTime = config.startOffsetTime;
         }
         return getOffset(consumer, topic, partition, startOffsetTime);
@@ -91,7 +93,7 @@ public class KafkaUtils {
                             LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                             return null;
                         }
-                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); 
+                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                         if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
                             LOG.warn("No data found in Kafka Partition " + partition.getId());
@@ -184,4 +186,29 @@ public class KafkaUtils {
         return tups;
     }
 
+
+    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
+        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
+        List<Partition> partitions = partitionInformation.getOrderedPartitions();
+        int numPartitions = partitions.size();
+        if (numPartitions < totalTasks) {
+            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+        }
+        List<Partition> taskPartitions = new ArrayList<Partition>();
+        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+            Partition taskPartition = partitions.get(i);
+            taskPartitions.add(taskPartition);
+        }
+        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+        return taskPartitions;
+    }
+
+    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
+        String taskPrefix = "[" + taskIndex + "/" + totalTasks + "] --> ";
+        if (taskPartitions.isEmpty()) {
+            LOG.warn(taskPrefix + "no partitions assigned");
+        } else {
+            LOG.info(taskPrefix + "assigned " + taskPartitions);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticCoordinator.java b/src/jvm/storm/kafka/StaticCoordinator.java
index 7415522..040060c 100644
--- a/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/src/jvm/storm/kafka/StaticCoordinator.java
@@ -12,13 +12,10 @@ public class StaticCoordinator implements PartitionCoordinator {
 
     public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
-        List<Partition> partitions = hosts.getPartitionInformation().getOrderedPartitions();
-        for (int i = taskIndex; i < partitions.size(); i += totalTasks) {
-            Partition myPartition = partitions.get(i);
+        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
+        for (Partition myPartition : myPartitions) {
             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
-
         }
-
         _allManagers = new ArrayList(_managers.values());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index 98e51a3..35d2c57 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -1,6 +1,5 @@
 package storm.kafka;
 
-import backtype.storm.task.IMetricsContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.GlobalPartitionInformation;
@@ -22,9 +21,12 @@ public class ZkCoordinator implements PartitionCoordinator {
     DynamicBrokersReader _reader;
     ZkState _state;
     Map _stormConf;
-    IMetricsContext _metricsContext;
 
     public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+    }
+
+    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
         _spoutConfig = spoutConfig;
         _connections = connections;
         _taskIndex = taskIndex;
@@ -32,11 +34,14 @@ public class ZkCoordinator implements PartitionCoordinator {
         _topologyInstanceId = topologyInstanceId;
         _stormConf = stormConf;
         _state = state;
-
         ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
         _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
-        _reader = new DynamicBrokersReader(stormConf, brokerConf.brokerZkStr, brokerConf.brokerZkPath, spoutConfig.topic);
+        _reader = reader;
+    }
 
+    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
+        ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
+        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
     }
 
     @Override
@@ -50,14 +55,9 @@ public class ZkCoordinator implements PartitionCoordinator {
 
     void refresh() {
         try {
-            LOG.info("Refreshing partition manager connections");
+            LOG.info(taskIdentifier() + "Refreshing partition manager connections");
             GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
-            Set<Partition> mine = new HashSet();
-            for (Partition partitionId : brokerInfo) {
-                if (myOwnership(partitionId)) {
-                    mine.add(partitionId);
-                }
-            }
+            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
 
             Set<Partition> curr = _managers.keySet();
             Set<Partition> newPartitions = new HashSet<Partition>(mine);
@@ -66,13 +66,13 @@ public class ZkCoordinator implements PartitionCoordinator {
             Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
             deletedPartitions.removeAll(mine);
 
-            LOG.info("Deleted partition managers: " + deletedPartitions.toString());
+            LOG.info(taskIdentifier() + "Deleted partition managers: " + deletedPartitions.toString());
 
             for (Partition id : deletedPartitions) {
                 PartitionManager man = _managers.remove(id);
                 man.close();
             }
-            LOG.info("New partition managers: " + newPartitions.toString());
+            LOG.info(taskIdentifier() + "New partition managers: " + newPartitions.toString());
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
@@ -83,16 +83,15 @@ public class ZkCoordinator implements PartitionCoordinator {
             throw new RuntimeException(e);
         }
         _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info("Finished refreshing");
+        LOG.info(taskIdentifier() + "Finished refreshing");
+    }
+
+    private String taskIdentifier() {
+        return "[" + _taskIndex + "/" + _totalTasks + "] - ";
     }
 
     @Override
     public PartitionManager getManager(Partition partition) {
         return _managers.get(partition);
     }
-
-    private boolean myOwnership(Partition id) {
-        int val = Math.abs(id.host.hashCode() + 23 * id.partition);
-        return val % _totalTasks == _taskIndex;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 0763042..a4e7f52 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -178,4 +178,43 @@ public class KafkaUtilsTest {
         Producer<String, String> producer = new Producer<String, String>(producerConfig);
         producer.send(new KeyedMessage<String, String>(config.topic, key, value));
     }
+
+
+    @Test
+    public void assignOnePartitionPerTask() {
+        runPartitionToTaskMappingTest(16, 1);
+    }
+
+    @Test
+    public void assignTwoPartitionsPerTask() {
+        runPartitionToTaskMappingTest(16, 2);
+    }
+
+    @Test
+    public void assignAllPartitionsToOneTask() {
+        runPartitionToTaskMappingTest(32, 32);
+    }
+
+
+    public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) {
+        GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions);
+        int numTasks = numPartitions / partitionsPerTask;
+        for (int i = 0 ; i < numTasks ; i++) {
+            assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, i).size());
+        }
+    }
+
+    @Test
+    public void moreTasksThanPartitions() {
+        GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(1);
+        int numTasks = 2;
+        assertEquals(1, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, 0).size());
+        assertEquals(0, KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, numTasks, 1).size());
+    }
+
+    @Test (expected = IllegalArgumentException.class )
+    public void assignInvalidTask() {
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        KafkaUtils.calculatePartitionsForTask(globalPartitionInformation, 1, 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/TestUtils.java b/src/test/storm/kafka/TestUtils.java
new file mode 100644
index 0000000..860d96d
--- /dev/null
+++ b/src/test/storm/kafka/TestUtils.java
@@ -0,0 +1,20 @@
+package storm.kafka;
+
+import storm.kafka.trident.GlobalPartitionInformation;
+
+public class TestUtils {
+
+    public static GlobalPartitionInformation buildPartitionInfo(int numPartitions) {
+        return buildPartitionInfo(numPartitions, 9092);
+    }
+
+
+    public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        for (int i = 0; i < numPartitions; i++) {
+            globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
+        }
+        return globalPartitionInformation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2f45866c/src/test/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/ZkCoordinatorTest.java b/src/test/storm/kafka/ZkCoordinatorTest.java
new file mode 100644
index 0000000..35b3b4b
--- /dev/null
+++ b/src/test/storm/kafka/ZkCoordinatorTest.java
@@ -0,0 +1,128 @@
+package storm.kafka;
+
+import backtype.storm.Config;
+import com.netflix.curator.test.TestingServer;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.when;
+
+public class ZkCoordinatorTest {
+
+
+    @Mock
+    private DynamicBrokersReader reader;
+
+    @Mock
+    private DynamicPartitionConnections dynamicPartitionConnections;
+
+    private KafkaTestBroker broker = new KafkaTestBroker();
+    private TestingServer server;
+    private Map stormConf = new HashMap();
+    private SpoutConfig spoutConfig;
+    private ZkState state;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        server = new TestingServer();
+        String connectionString = server.getConnectString();
+        ZkHosts hosts = new ZkHosts(connectionString);
+        hosts.refreshFreqSecs = 1;
+        spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id");
+        Map conf = buildZookeeperConfig(server);
+        state = new ZkState(conf);
+        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
+        when(dynamicPartitionConnections.register(any(Broker.class), anyInt())).thenReturn(simpleConsumer);
+    }
+
+    private Map buildZookeeperConfig(TestingServer server) {
+        Map conf = new HashMap();
+        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
+        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
+        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
+        return conf;
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        broker.shutdown();
+        server.stop();
+    }
+
+    @Test
+    public void testOnePartitionPerTask() throws Exception {
+        int totalTasks = 64;
+        int partitionsPerTask = 1;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks));
+        for (ZkCoordinator coordinator : coordinatorList) {
+            List<PartitionManager> myManagedPartitions = coordinator.getMyManagedPartitions();
+            assertEquals(partitionsPerTask, myManagedPartitions.size());
+            assertEquals(coordinator._taskIndex, myManagedPartitions.get(0).getPartition().partition);
+        }
+    }
+
+
+    @Test
+    public void testPartitionsChange() throws Exception {
+        final int totalTasks = 64;
+        int partitionsPerTask = 2;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9092));
+        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
+        waitForRefresh();
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9093));
+        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
+        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
+        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
+        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
+            List<PartitionManager> partitionManagersAfter = iterator.next();
+            assertPartitionsAreDifferent(partitionManagersBefore, partitionManagersAfter, partitionsPerTask);
+        }
+    }
+
+    private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) {
+        assertEquals(partitionsPerTask, partitionManagersBefore.size());
+        assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());
+        for (int i = 0; i < partitionsPerTask; i++) {
+            assertNotEquals(partitionManagersBefore.get(i).getPartition(), partitionManagersAfter.get(i).getPartition());
+        }
+
+    }
+
+    private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) {
+        List<List<PartitionManager>> partitions = new ArrayList();
+        for (ZkCoordinator coordinator : coordinatorList) {
+            partitions.add(coordinator.getMyManagedPartitions());
+        }
+        return partitions;
+    }
+
+    private void waitForRefresh() throws InterruptedException {
+        Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1);
+    }
+
+    private List<ZkCoordinator> buildCoordinators(int totalTasks) {
+        List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
+        for (int i = 0; i < totalTasks; i++) {
+            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
+            coordinatorList.add(coordinator);
+        }
+        return coordinatorList;
+    }
+
+
+}