You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:02 UTC

[03/50] [abbrv] use consistent formatting

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/src/jvm/storm/kafka/trident/ZkBrokerReader.java
index 0a53137..5e2361d 100644
--- a/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/ZkBrokerReader.java
@@ -10,9 +10,9 @@ import java.util.Map;
 
 public class ZkBrokerReader implements IBrokerReader {
 
-	public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+    public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
 
-	GlobalPartitionInformation cachedBrokers;
+    GlobalPartitionInformation cachedBrokers;
     DynamicBrokersReader reader;
     long lastRefreshTimeMs;
 
@@ -30,16 +30,16 @@ public class ZkBrokerReader implements IBrokerReader {
     @Override
     public GlobalPartitionInformation getCurrentBrokers() {
         long currTime = System.currentTimeMillis();
-        if(currTime > lastRefreshTimeMs + refreshMillis) {
-			LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+        if (currTime > lastRefreshTimeMs + refreshMillis) {
+            LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
             cachedBrokers = reader.getBrokerInfo();
             lastRefreshTimeMs = currTime;
-		}
+        }
         return cachedBrokers;
     }
 
     @Override
     public void close() {
         reader.close();
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/test/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/DynamicBrokersReaderTest.java b/src/test/storm/kafka/DynamicBrokersReaderTest.java
index a6c2309..fd90c3c 100644
--- a/src/test/storm/kafka/DynamicBrokersReaderTest.java
+++ b/src/test/storm/kafka/DynamicBrokersReaderTest.java
@@ -21,133 +21,133 @@ import static org.junit.Assert.assertEquals;
  * Time: 20:35
  */
 public class DynamicBrokersReaderTest {
-	private DynamicBrokersReader dynamicBrokersReader;
-	private String masterPath = "/brokers";
-	private String topic = "testing";
-	private CuratorFramework zookeeper;
-	private TestingServer server;
-
-	@Before
-	public void setUp() throws Exception {
-		server = new TestingServer();
-		String connectionString = server.getConnectString();
-		Map conf = new HashMap();
-		conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-		conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
-		conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
-		zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
-		dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
-		zookeeper.start();
-	}
-
-	@After
-	public void tearDown() throws Exception {
-   		server.close();
-	}
-
-	private void addPartition(int id, String host, int port) throws Exception {
-		writePartitionId(id);
-		writeLeader(id, 0);
-		writeLeaderDetails(0, host, port);
-	}
-
-	private void addPartition(int id, int leader, String host, int port) throws Exception {
-		writePartitionId(id);
-		writeLeader(id, leader);
-		writeLeaderDetails(leader, host, port);
-	}
-
-	private void writePartitionId(int id) throws Exception {
-		String path = dynamicBrokersReader.partitionPath();
-		writeDataToPath(path, ("" + id));
-	}
-
-	private void writeDataToPath(String path, String data) throws Exception {
-		ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
-		zookeeper.setData().forPath(path, data.getBytes());
-	}
-
-	private void writeLeader(int id, int leaderId) throws Exception {
-		String path = dynamicBrokersReader.partitionPath() + "/" + id + "/state";
-		String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" +leaderId + ", \"leader_epoch\":1, \"version\":1 }";
-		writeDataToPath(path, value);
-	}
-
-	private void writeLeaderDetails(int leaderId, String host, int port) throws Exception{
-		String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
-		String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
-		writeDataToPath(path, value);
-	}
-
-	@Test
-	public void testGetBrokerInfo() throws Exception {
-		String host = "localhost";
-		int port = 9092;
-		int partition = 0;
-		addPartition(partition, host, port);
-		GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
-		assertEquals(1,brokerInfo.getOrderedPartitions().size());
-		assertEquals(port, brokerInfo.getHostFor(partition).port);
-		assertEquals(host, brokerInfo.getHostFor(partition).host);
-	}
-
-
-	@Test
-	public void testMultiplePartitionsOnDifferentHosts() throws Exception {
-		String host = "localhost";
-		int port = 9092;
-		int secondPort = 9093;
-		int partition = 0;
-		int secondPartition = partition + 1;
-		addPartition(partition, 0, host, port);
-		addPartition(secondPartition, 1,  host, secondPort);
-
-		GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
-		assertEquals(2,brokerInfo.getOrderedPartitions().size());
-
-		assertEquals(port, brokerInfo.getHostFor(partition).port);
-		assertEquals(host, brokerInfo.getHostFor(partition).host);
-
-		assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
-		assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
-	}
-
-
-	@Test
-	public void testMultiplePartitionsOnSameHost() throws Exception {
-		String host = "localhost";
-		int port = 9092;
-		int partition = 0;
-		int secondPartition = partition + 1;
-		addPartition(partition, 0, host, port);
-		addPartition(secondPartition, 0,  host, port);
-
-		GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
-		assertEquals(2,brokerInfo.getOrderedPartitions().size());
-
-		assertEquals(port, brokerInfo.getHostFor(partition).port);
-		assertEquals(host, brokerInfo.getHostFor(partition).host);
-
-		assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
-		assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
-	}
-
-	@Test
-	public void testSwitchHostForPartition() throws Exception {
-		String host = "localhost";
-		int port = 9092;
-		int partition = 0;
-		addPartition(partition, host, port);
-		GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
-		assertEquals(port, brokerInfo.getHostFor(partition).port);
-		assertEquals(host, brokerInfo.getHostFor(partition).host);
-
-		String newHost = host + "switch";
-		int newPort = port + 1;
-		addPartition(partition, newHost, newPort);
-		brokerInfo = dynamicBrokersReader.getBrokerInfo();
-		assertEquals(newPort, brokerInfo.getHostFor(partition).port);
-		assertEquals(newHost, brokerInfo.getHostFor(partition).host);
-	}
+    private DynamicBrokersReader dynamicBrokersReader;
+    private String masterPath = "/brokers";
+    private String topic = "testing";
+    private CuratorFramework zookeeper;
+    private TestingServer server;
+
+    @Before
+    public void setUp() throws Exception {
+        server = new TestingServer();
+        String connectionString = server.getConnectString();
+        Map conf = new HashMap();
+        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+        dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+        zookeeper.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        server.close();
+    }
+
+    private void addPartition(int id, String host, int port) throws Exception {
+        writePartitionId(id);
+        writeLeader(id, 0);
+        writeLeaderDetails(0, host, port);
+    }
+
+    private void addPartition(int id, int leader, String host, int port) throws Exception {
+        writePartitionId(id);
+        writeLeader(id, leader);
+        writeLeaderDetails(leader, host, port);
+    }
+
+    private void writePartitionId(int id) throws Exception {
+        String path = dynamicBrokersReader.partitionPath();
+        writeDataToPath(path, ("" + id));
+    }
+
+    private void writeDataToPath(String path, String data) throws Exception {
+        ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
+        zookeeper.setData().forPath(path, data.getBytes());
+    }
+
+    private void writeLeader(int id, int leaderId) throws Exception {
+        String path = dynamicBrokersReader.partitionPath() + "/" + id + "/state";
+        String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
+        writeDataToPath(path, value);
+    }
+
+    private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
+        String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
+        String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
+        writeDataToPath(path, value);
+    }
+
+    @Test
+    public void testGetBrokerInfo() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        addPartition(partition, host, port);
+        GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+        assertEquals(1, brokerInfo.getOrderedPartitions().size());
+        assertEquals(port, brokerInfo.getHostFor(partition).port);
+        assertEquals(host, brokerInfo.getHostFor(partition).host);
+    }
+
+
+    @Test
+    public void testMultiplePartitionsOnDifferentHosts() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int secondPort = 9093;
+        int partition = 0;
+        int secondPartition = partition + 1;
+        addPartition(partition, 0, host, port);
+        addPartition(secondPartition, 1, host, secondPort);
+
+        GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+        assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+        assertEquals(port, brokerInfo.getHostFor(partition).port);
+        assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+        assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+    }
+
+
+    @Test
+    public void testMultiplePartitionsOnSameHost() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        int secondPartition = partition + 1;
+        addPartition(partition, 0, host, port);
+        addPartition(secondPartition, 0, host, port);
+
+        GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+        assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+        assertEquals(port, brokerInfo.getHostFor(partition).port);
+        assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+        assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+    }
+
+    @Test
+    public void testSwitchHostForPartition() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        addPartition(partition, host, port);
+        GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
+        assertEquals(port, brokerInfo.getHostFor(partition).port);
+        assertEquals(host, brokerInfo.getHostFor(partition).host);
+
+        String newHost = host + "switch";
+        int newPort = port + 1;
+        addPartition(partition, newHost, newPort);
+        brokerInfo = dynamicBrokersReader.getBrokerInfo();
+        assertEquals(newPort, brokerInfo.getHostFor(partition).port);
+        assertEquals(newHost, brokerInfo.getHostFor(partition).host);
+    }
 }