You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:02 UTC
[03/50] [abbrv] use consistent formatting
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/src/jvm/storm/kafka/trident/ZkBrokerReader.java
index 0a53137..5e2361d 100644
--- a/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/ZkBrokerReader.java
@@ -10,9 +10,9 @@ import java.util.Map;
public class ZkBrokerReader implements IBrokerReader {
- public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+ public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
- GlobalPartitionInformation cachedBrokers;
+ GlobalPartitionInformation cachedBrokers;
DynamicBrokersReader reader;
long lastRefreshTimeMs;
@@ -30,16 +30,16 @@ public class ZkBrokerReader implements IBrokerReader {
@Override
public GlobalPartitionInformation getCurrentBrokers() {
long currTime = System.currentTimeMillis();
- if(currTime > lastRefreshTimeMs + refreshMillis) {
- LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+ if (currTime > lastRefreshTimeMs + refreshMillis) {
+ LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
- }
+ }
return cachedBrokers;
}
@Override
public void close() {
reader.close();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/test/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/DynamicBrokersReaderTest.java b/src/test/storm/kafka/DynamicBrokersReaderTest.java
index a6c2309..fd90c3c 100644
--- a/src/test/storm/kafka/DynamicBrokersReaderTest.java
+++ b/src/test/storm/kafka/DynamicBrokersReaderTest.java
@@ -21,133 +21,133 @@ import static org.junit.Assert.assertEquals;
* Time: 20:35
*/
public class DynamicBrokersReaderTest {
- private DynamicBrokersReader dynamicBrokersReader;
- private String masterPath = "/brokers";
- private String topic = "testing";
- private CuratorFramework zookeeper;
- private TestingServer server;
-
- @Before
- public void setUp() throws Exception {
- server = new TestingServer();
- String connectionString = server.getConnectString();
- Map conf = new HashMap();
- conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
- zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
- dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
- zookeeper.start();
- }
-
- @After
- public void tearDown() throws Exception {
- server.close();
- }
-
- private void addPartition(int id, String host, int port) throws Exception {
- writePartitionId(id);
- writeLeader(id, 0);
- writeLeaderDetails(0, host, port);
- }
-
- private void addPartition(int id, int leader, String host, int port) throws Exception {
- writePartitionId(id);
- writeLeader(id, leader);
- writeLeaderDetails(leader, host, port);
- }
-
- private void writePartitionId(int id) throws Exception {
- String path = dynamicBrokersReader.partitionPath();
- writeDataToPath(path, ("" + id));
- }
-
- private void writeDataToPath(String path, String data) throws Exception {
- ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
- zookeeper.setData().forPath(path, data.getBytes());
- }
-
- private void writeLeader(int id, int leaderId) throws Exception {
- String path = dynamicBrokersReader.partitionPath() + "/" + id + "/state";
- String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" +leaderId + ", \"leader_epoch\":1, \"version\":1 }";
- writeDataToPath(path, value);
- }
-
- private void writeLeaderDetails(int leaderId, String host, int port) throws Exception{
- String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
- String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
- writeDataToPath(path, value);
- }
-
- @Test
- public void testGetBrokerInfo() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- addPartition(partition, host, port);
- GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(1,brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
- }
-
-
- @Test
- public void testMultiplePartitionsOnDifferentHosts() throws Exception {
- String host = "localhost";
- int port = 9092;
- int secondPort = 9093;
- int partition = 0;
- int secondPartition = partition + 1;
- addPartition(partition, 0, host, port);
- addPartition(secondPartition, 1, host, secondPort);
-
- GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(2,brokerInfo.getOrderedPartitions().size());
-
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
-
- assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
- assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
- }
-
-
- @Test
- public void testMultiplePartitionsOnSameHost() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- int secondPartition = partition + 1;
- addPartition(partition, 0, host, port);
- addPartition(secondPartition, 0, host, port);
-
- GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(2,brokerInfo.getOrderedPartitions().size());
-
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
-
- assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
- assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
- }
-
- @Test
- public void testSwitchHostForPartition() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- addPartition(partition, host, port);
- GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
-
- String newHost = host + "switch";
- int newPort = port + 1;
- addPartition(partition, newHost, newPort);
- brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(newPort, brokerInfo.getHostFor(partition).port);
- assertEquals(newHost, brokerInfo.getHostFor(partition).host);
- }
+ private DynamicBrokersReader dynamicBrokersReader;
+ private String masterPath = "/brokers";
+ private String topic = "testing";
+ private CuratorFramework zookeeper;
+ private TestingServer server;
+
+ @Before
+ public void setUp() throws Exception {
+ server = new TestingServer();
+ String connectionString = server.getConnectString();
+ Map conf = new HashMap();
+ conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+ dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+ zookeeper.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.close();
+ }
+
+ private void addPartition(int id, String host, int port) throws Exception {
+ writePartitionId(id);
+ writeLeader(id, 0);
+ writeLeaderDetails(0, host, port);
+ }
+
+ private void addPartition(int id, int leader, String host, int port) throws Exception {
+ writePartitionId(id);
+ writeLeader(id, leader);
+ writeLeaderDetails(leader, host, port);
+ }
+
+ private void writePartitionId(int id) throws Exception {
+ String path = dynamicBrokersReader.partitionPath();
+ writeDataToPath(path, ("" + id));
+ }
+
+ private void writeDataToPath(String path, String data) throws Exception {
+ ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
+ zookeeper.setData().forPath(path, data.getBytes());
+ }
+
+ private void writeLeader(int id, int leaderId) throws Exception {
+ String path = dynamicBrokersReader.partitionPath() + "/" + id + "/state";
+ String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
+ writeDataToPath(path, value);
+ }
+
+ private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
+ String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
+ String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
+ writeDataToPath(path, value);
+ }
+
+ @Test
+ public void testGetBrokerInfo() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ addPartition(partition, host, port);
+ GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+ assertEquals(1, brokerInfo.getOrderedPartitions().size());
+ assertEquals(port, brokerInfo.getHostFor(partition).port);
+ assertEquals(host, brokerInfo.getHostFor(partition).host);
+ }
+
+
+ @Test
+ public void testMultiplePartitionsOnDifferentHosts() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int secondPort = 9093;
+ int partition = 0;
+ int secondPartition = partition + 1;
+ addPartition(partition, 0, host, port);
+ addPartition(secondPartition, 1, host, secondPort);
+
+ GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+ assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+ assertEquals(port, brokerInfo.getHostFor(partition).port);
+ assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+ assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+ }
+
+
+ @Test
+ public void testMultiplePartitionsOnSameHost() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ int secondPartition = partition + 1;
+ addPartition(partition, 0, host, port);
+ addPartition(secondPartition, 0, host, port);
+
+ GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+ assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+ assertEquals(port, brokerInfo.getHostFor(partition).port);
+ assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+ assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+ }
+
+ @Test
+ public void testSwitchHostForPartition() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ addPartition(partition, host, port);
+ GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+ assertEquals(port, brokerInfo.getHostFor(partition).port);
+ assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+ String newHost = host + "switch";
+ int newPort = port + 1;
+ addPartition(partition, newHost, newPort);
+ brokerInfo = dynamicBrokersReader.getBrokerInfo();
+ assertEquals(newPort, brokerInfo.getHostFor(partition).port);
+ assertEquals(newHost, brokerInfo.getHostFor(partition).host);
+ }
}