You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:05 UTC
[06/50] [abbrv] git commit: Renamed HostPort to Broker
Renamed HostPort to Broker
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/735b87f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/735b87f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/735b87f7
Branch: refs/heads/master
Commit: 735b87f78459ec686017c538ef50d95a7db9584b
Parents: da18bd8
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Wed Dec 25 16:28:16 2013 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Wed Dec 25 16:28:16 2013 +0000
----------------------------------------------------------------------
src/jvm/storm/kafka/Broker.java | 56 ++++++++++++++++++++
src/jvm/storm/kafka/DynamicBrokersReader.java | 8 +--
.../kafka/DynamicPartitionConnections.java | 10 ++--
src/jvm/storm/kafka/HostPort.java | 56 --------------------
src/jvm/storm/kafka/Partition.java | 4 +-
.../storm/kafka/StaticPartitionConnections.java | 2 +-
.../trident/GlobalPartitionInformation.java | 17 +++---
.../storm/kafka/DynamicBrokersReaderTest.java | 28 +++++-----
8 files changed, 91 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Broker.java b/src/jvm/storm/kafka/Broker.java
new file mode 100644
index 0000000..66e6112
--- /dev/null
+++ b/src/jvm/storm/kafka/Broker.java
@@ -0,0 +1,56 @@
+package storm.kafka;
+
+import java.io.Serializable;
+
+public class Broker implements Serializable, Comparable<Broker> {
+ public final String host;
+ public final int port;
+
+ public Broker(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public Broker(String host) {
+ this(host, 9092);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ Broker other = (Broker) o;
+ return host.equals(other.host) && port == other.port;
+ }
+
+ @Override
+ public int hashCode() {
+ return host.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return host + ":" + port;
+ }
+
+ public static Broker fromString(String host) {
+ Broker hp;
+ String[] spec = host.split(":");
+ if (spec.length == 1) {
+ hp = new Broker(spec[0]);
+ } else if (spec.length == 2) {
+ hp = new Broker(spec[0], Integer.parseInt(spec[1]));
+ } else {
+ throw new IllegalArgumentException("Invalid host specification: " + host);
+ }
+ return hp;
+ }
+
+
+ @Override
+ public int compareTo(Broker o) {
+ if (this.host.equals(o.host)) {
+ return this.port - o.port;
+ } else {
+ return this.host.compareTo(o.host);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicBrokersReader.java b/src/jvm/storm/kafka/DynamicBrokersReader.java
index c802baf..5b1d750 100644
--- a/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -51,8 +51,8 @@ public class DynamicBrokersReader {
int leader = getLeaderFor(partition);
String path = brokerInfoPath + "/" + leader;
try {
- byte[] hostPortData = _curator.getData().forPath(path);
- HostPort hp = getBrokerHost(hostPortData);
+ byte[] brokerData = _curator.getData().forPath(path);
+ Broker hp = getBrokerHost(brokerData);
globalPartitionInformation.addPartition(partition, hp);
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
LOG.error("Node {} does not exist ", path);
@@ -114,12 +114,12 @@ public class DynamicBrokersReader {
* @param contents
* @return
*/
- private HostPort getBrokerHost(byte[] contents) {
+ private Broker getBrokerHost(byte[] contents) {
try {
Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
String host = (String) value.get("host");
Integer port = ((Long) value.get("port")).intValue();
- return new HostPort(host, port);
+ return new Broker(host, port);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicPartitionConnections.java b/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 7a799a0..8d0115b 100644
--- a/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ b/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@ -24,7 +24,7 @@ public class DynamicPartitionConnections {
}
}
- Map<HostPort, ConnectionInfo> _connections = new HashMap();
+ Map<Broker, ConnectionInfo> _connections = new HashMap();
KafkaConfig _config;
IBrokerReader _reader;
@@ -34,11 +34,11 @@ public class DynamicPartitionConnections {
}
public SimpleConsumer register(Partition partition) {
- HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
- return register(hostPort, partition.partition);
+ Broker broker = _reader.getCurrentBrokers().getBrokerFor(partition.partition);
+ return register(broker, partition.partition);
}
- public SimpleConsumer register(HostPort host, int partition) {
+ public SimpleConsumer register(Broker host, int partition) {
if (!_connections.containsKey(host)) {
_connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
}
@@ -55,7 +55,7 @@ public class DynamicPartitionConnections {
return null;
}
- public void unregister(HostPort port, int partition) {
+ public void unregister(Broker port, int partition) {
ConnectionInfo info = _connections.get(port);
info.partitions.remove(partition);
if (info.partitions.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/HostPort.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/HostPort.java b/src/jvm/storm/kafka/HostPort.java
deleted file mode 100644
index 5369858..0000000
--- a/src/jvm/storm/kafka/HostPort.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package storm.kafka;
-
-import java.io.Serializable;
-
-public class HostPort implements Serializable, Comparable<HostPort> {
- public String host;
- public int port;
-
- public HostPort(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public HostPort(String host) {
- this(host, 9092);
- }
-
- @Override
- public boolean equals(Object o) {
- HostPort other = (HostPort) o;
- return host.equals(other.host) && port == other.port;
- }
-
- @Override
- public int hashCode() {
- return host.hashCode();
- }
-
- @Override
- public String toString() {
- return host + ":" + port;
- }
-
- public static HostPort fromString(String host) {
- HostPort hp;
- String[] spec = host.split(":");
- if (spec.length == 1) {
- hp = new HostPort(spec[0]);
- } else if (spec.length == 2) {
- hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
- } else {
- throw new IllegalArgumentException("Invalid host specification: " + host);
- }
- return hp;
- }
-
-
- @Override
- public int compareTo(HostPort o) {
- if (this.host.equals(o.host)) {
- return this.port - o.port;
- } else {
- return this.host.compareTo(o.host);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index 87ab7b8..bbb4fbb 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -6,10 +6,10 @@ import storm.trident.spout.ISpoutPartition;
public class Partition implements ISpoutPartition {
- public final HostPort host;
+ public final Broker host;
public final int partition;
- public Partition(HostPort host, int partition) {
+ public Partition(Broker host, int partition) {
this.host = host;
this.partition = partition;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticPartitionConnections.java b/src/jvm/storm/kafka/StaticPartitionConnections.java
index 4294362..a9b9db1 100644
--- a/src/jvm/storm/kafka/StaticPartitionConnections.java
+++ b/src/jvm/storm/kafka/StaticPartitionConnections.java
@@ -20,7 +20,7 @@ public class StaticPartitionConnections {
public SimpleConsumer getConsumer(int partition) {
if (!_kafka.containsKey(partition)) {
- HostPort hp = hosts.getPartitionInformation().getHostFor(partition);
+ Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
_kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index 6b0fdec..a790009 100644
--- a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -1,6 +1,7 @@
package storm.kafka.trident;
-import storm.kafka.HostPort;
+import storm.kafka.Broker;
+import storm.kafka.Broker;
import storm.kafka.Partition;
import java.io.Serializable;
@@ -12,13 +13,13 @@ import java.util.*;
*/
public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
- private Map<Integer, HostPort> partitionMap;
+ private Map<Integer, Broker> partitionMap;
public GlobalPartitionInformation() {
- partitionMap = new TreeMap<Integer, HostPort>();
+ partitionMap = new TreeMap<Integer, Broker>();
}
- public void addPartition(int partitionId, HostPort broker) {
+ public void addPartition(int partitionId, Broker broker) {
partitionMap.put(partitionId, broker);
}
@@ -29,13 +30,13 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
'}';
}
- public HostPort getHostFor(Integer partitionId) {
+ public Broker getBrokerFor(Integer partitionId) {
return partitionMap.get(partitionId);
}
public List<Partition> getOrderedPartitions() {
List<Partition> partitions = new LinkedList<Partition>();
- for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
+ for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
partitions.add(new Partition(partition.getValue(), partition.getKey()));
}
return partitions;
@@ -43,7 +44,7 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
@Override
public Iterator<Partition> iterator() {
- final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
+ final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
return new Iterator<Partition>() {
@Override
@@ -53,7 +54,7 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
@Override
public Partition next() {
- Map.Entry<Integer, HostPort> next = iterator.next();
+ Map.Entry<Integer, Broker> next = iterator.next();
return new Partition(next.getValue(), next.getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/test/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/DynamicBrokersReaderTest.java b/src/test/storm/kafka/DynamicBrokersReaderTest.java
index fd90c3c..47387e3 100644
--- a/src/test/storm/kafka/DynamicBrokersReaderTest.java
+++ b/src/test/storm/kafka/DynamicBrokersReaderTest.java
@@ -88,8 +88,8 @@ public class DynamicBrokersReaderTest {
addPartition(partition, host, port);
GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
assertEquals(1, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
}
@@ -106,11 +106,11 @@ public class DynamicBrokersReaderTest {
GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
assertEquals(2, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
- assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
- assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+ assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
}
@@ -126,11 +126,11 @@ public class DynamicBrokersReaderTest {
GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
assertEquals(2, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
- assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
- assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+ assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
}
@Test
@@ -140,14 +140,14 @@ public class DynamicBrokersReaderTest {
int partition = 0;
addPartition(partition, host, port);
GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(port, brokerInfo.getHostFor(partition).port);
- assertEquals(host, brokerInfo.getHostFor(partition).host);
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
String newHost = host + "switch";
int newPort = port + 1;
addPartition(partition, newHost, newPort);
brokerInfo = dynamicBrokersReader.getBrokerInfo();
- assertEquals(newPort, brokerInfo.getHostFor(partition).port);
- assertEquals(newHost, brokerInfo.getHostFor(partition).host);
+ assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
}
}