You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:05 UTC

[06/50] [abbrv] git commit: Renamed HostPort to Broker

Renamed HostPort to Broker


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/735b87f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/735b87f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/735b87f7

Branch: refs/heads/master
Commit: 735b87f78459ec686017c538ef50d95a7db9584b
Parents: da18bd8
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Wed Dec 25 16:28:16 2013 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Wed Dec 25 16:28:16 2013 +0000

----------------------------------------------------------------------
 src/jvm/storm/kafka/Broker.java                 | 56 ++++++++++++++++++++
 src/jvm/storm/kafka/DynamicBrokersReader.java   |  8 +--
 .../kafka/DynamicPartitionConnections.java      | 10 ++--
 src/jvm/storm/kafka/HostPort.java               | 56 --------------------
 src/jvm/storm/kafka/Partition.java              |  4 +-
 .../storm/kafka/StaticPartitionConnections.java |  2 +-
 .../trident/GlobalPartitionInformation.java     | 17 +++---
 .../storm/kafka/DynamicBrokersReaderTest.java   | 28 +++++-----
 8 files changed, 91 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Broker.java b/src/jvm/storm/kafka/Broker.java
new file mode 100644
index 0000000..66e6112
--- /dev/null
+++ b/src/jvm/storm/kafka/Broker.java
@@ -0,0 +1,56 @@
+package storm.kafka;
+
+import java.io.Serializable;
+
+public class Broker implements Serializable, Comparable<Broker> {
+    public final String host;
+    public final int port;
+
+    public Broker(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public Broker(String host) {
+        this(host, 9092);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        Broker other = (Broker) o;
+        return host.equals(other.host) && port == other.port;
+    }
+
+    @Override
+    public int hashCode() {
+        return host.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return host + ":" + port;
+    }
+
+    public static Broker fromString(String host) {
+        Broker hp;
+        String[] spec = host.split(":");
+        if (spec.length == 1) {
+            hp = new Broker(spec[0]);
+        } else if (spec.length == 2) {
+            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
+        } else {
+            throw new IllegalArgumentException("Invalid host specification: " + host);
+        }
+        return hp;
+    }
+
+
+    @Override
+    public int compareTo(Broker o) {
+        if (this.host.equals(o.host)) {
+            return this.port - o.port;
+        } else {
+            return this.host.compareTo(o.host);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicBrokersReader.java b/src/jvm/storm/kafka/DynamicBrokersReader.java
index c802baf..5b1d750 100644
--- a/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -51,8 +51,8 @@ public class DynamicBrokersReader {
                 int leader = getLeaderFor(partition);
                 String path = brokerInfoPath + "/" + leader;
                 try {
-                    byte[] hostPortData = _curator.getData().forPath(path);
-                    HostPort hp = getBrokerHost(hostPortData);
+                    byte[] brokerData = _curator.getData().forPath(path);
+                    Broker hp = getBrokerHost(brokerData);
                     globalPartitionInformation.addPartition(partition, hp);
                 } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                     LOG.error("Node {} does not exist ", path);
@@ -114,12 +114,12 @@ public class DynamicBrokersReader {
      * @param contents
      * @return
      */
-    private HostPort getBrokerHost(byte[] contents) {
+    private Broker getBrokerHost(byte[] contents) {
         try {
             Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
             String host = (String) value.get("host");
             Integer port = ((Long) value.get("port")).intValue();
-            return new HostPort(host, port);
+            return new Broker(host, port);
         } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicPartitionConnections.java b/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 7a799a0..8d0115b 100644
--- a/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ b/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@ -24,7 +24,7 @@ public class DynamicPartitionConnections {
         }
     }
 
-    Map<HostPort, ConnectionInfo> _connections = new HashMap();
+    Map<Broker, ConnectionInfo> _connections = new HashMap();
     KafkaConfig _config;
     IBrokerReader _reader;
 
@@ -34,11 +34,11 @@ public class DynamicPartitionConnections {
     }
 
     public SimpleConsumer register(Partition partition) {
-        HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
-        return register(hostPort, partition.partition);
+        Broker broker = _reader.getCurrentBrokers().getBrokerFor(partition.partition);
+        return register(broker, partition.partition);
     }
 
-    public SimpleConsumer register(HostPort host, int partition) {
+    public SimpleConsumer register(Broker host, int partition) {
         if (!_connections.containsKey(host)) {
             _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
         }
@@ -55,7 +55,7 @@ public class DynamicPartitionConnections {
         return null;
     }
 
-    public void unregister(HostPort port, int partition) {
+    public void unregister(Broker port, int partition) {
         ConnectionInfo info = _connections.get(port);
         info.partitions.remove(partition);
         if (info.partitions.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/HostPort.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/HostPort.java b/src/jvm/storm/kafka/HostPort.java
deleted file mode 100644
index 5369858..0000000
--- a/src/jvm/storm/kafka/HostPort.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package storm.kafka;
-
-import java.io.Serializable;
-
-public class HostPort implements Serializable, Comparable<HostPort> {
-    public String host;
-    public int port;
-
-    public HostPort(String host, int port) {
-        this.host = host;
-        this.port = port;
-    }
-
-    public HostPort(String host) {
-        this(host, 9092);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        HostPort other = (HostPort) o;
-        return host.equals(other.host) && port == other.port;
-    }
-
-    @Override
-    public int hashCode() {
-        return host.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return host + ":" + port;
-    }
-
-    public static HostPort fromString(String host) {
-        HostPort hp;
-        String[] spec = host.split(":");
-        if (spec.length == 1) {
-            hp = new HostPort(spec[0]);
-        } else if (spec.length == 2) {
-            hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
-        } else {
-            throw new IllegalArgumentException("Invalid host specification: " + host);
-        }
-        return hp;
-    }
-
-
-    @Override
-    public int compareTo(HostPort o) {
-        if (this.host.equals(o.host)) {
-            return this.port - o.port;
-        } else {
-            return this.host.compareTo(o.host);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index 87ab7b8..bbb4fbb 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -6,10 +6,10 @@ import storm.trident.spout.ISpoutPartition;
 
 public class Partition implements ISpoutPartition {
 
-    public final HostPort host;
+    public final Broker host;
     public final int partition;
 
-    public Partition(HostPort host, int partition) {
+    public Partition(Broker host, int partition) {
         this.host = host;
         this.partition = partition;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticPartitionConnections.java b/src/jvm/storm/kafka/StaticPartitionConnections.java
index 4294362..a9b9db1 100644
--- a/src/jvm/storm/kafka/StaticPartitionConnections.java
+++ b/src/jvm/storm/kafka/StaticPartitionConnections.java
@@ -20,7 +20,7 @@ public class StaticPartitionConnections {
 
     public SimpleConsumer getConsumer(int partition) {
         if (!_kafka.containsKey(partition)) {
-            HostPort hp = hosts.getPartitionInformation().getHostFor(partition);
+            Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
             _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index 6b0fdec..a790009 100644
--- a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -1,6 +1,7 @@
 package storm.kafka.trident;
 
-import storm.kafka.HostPort;
+import storm.kafka.Broker;
+import storm.kafka.Broker;
 import storm.kafka.Partition;
 
 import java.io.Serializable;
@@ -12,13 +13,13 @@ import java.util.*;
  */
 public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
 
-    private Map<Integer, HostPort> partitionMap;
+    private Map<Integer, Broker> partitionMap;
 
     public GlobalPartitionInformation() {
-        partitionMap = new TreeMap<Integer, HostPort>();
+        partitionMap = new TreeMap<Integer, Broker>();
     }
 
-    public void addPartition(int partitionId, HostPort broker) {
+    public void addPartition(int partitionId, Broker broker) {
         partitionMap.put(partitionId, broker);
     }
 
@@ -29,13 +30,13 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
                 '}';
     }
 
-    public HostPort getHostFor(Integer partitionId) {
+    public Broker getBrokerFor(Integer partitionId) {
         return partitionMap.get(partitionId);
     }
 
     public List<Partition> getOrderedPartitions() {
         List<Partition> partitions = new LinkedList<Partition>();
-        for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
+        for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
             partitions.add(new Partition(partition.getValue(), partition.getKey()));
         }
         return partitions;
@@ -43,7 +44,7 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
 
     @Override
     public Iterator<Partition> iterator() {
-        final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
+        final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
 
         return new Iterator<Partition>() {
             @Override
@@ -53,7 +54,7 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
 
             @Override
             public Partition next() {
-                Map.Entry<Integer, HostPort> next = iterator.next();
+                Map.Entry<Integer, Broker> next = iterator.next();
                 return new Partition(next.getValue(), next.getKey());
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/735b87f7/src/test/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/DynamicBrokersReaderTest.java b/src/test/storm/kafka/DynamicBrokersReaderTest.java
index fd90c3c..47387e3 100644
--- a/src/test/storm/kafka/DynamicBrokersReaderTest.java
+++ b/src/test/storm/kafka/DynamicBrokersReaderTest.java
@@ -88,8 +88,8 @@ public class DynamicBrokersReaderTest {
         addPartition(partition, host, port);
         GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
         assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getHostFor(partition).port);
-        assertEquals(host, brokerInfo.getHostFor(partition).host);
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
     }
 
 
@@ -106,11 +106,11 @@ public class DynamicBrokersReaderTest {
         GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
         assertEquals(2, brokerInfo.getOrderedPartitions().size());
 
-        assertEquals(port, brokerInfo.getHostFor(partition).port);
-        assertEquals(host, brokerInfo.getHostFor(partition).host);
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
 
-        assertEquals(secondPort, brokerInfo.getHostFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+        assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
     }
 
 
@@ -126,11 +126,11 @@ public class DynamicBrokersReaderTest {
         GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
         assertEquals(2, brokerInfo.getOrderedPartitions().size());
 
-        assertEquals(port, brokerInfo.getHostFor(partition).port);
-        assertEquals(host, brokerInfo.getHostFor(partition).host);
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
 
-        assertEquals(port, brokerInfo.getHostFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getHostFor(secondPartition).host);
+        assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
     }
 
     @Test
@@ -140,14 +140,14 @@ public class DynamicBrokersReaderTest {
         int partition = 0;
         addPartition(partition, host, port);
         GlobalPartitionInformation brokerInfo = dynamicBrokersReader.getBrokerInfo();
-        assertEquals(port, brokerInfo.getHostFor(partition).port);
-        assertEquals(host, brokerInfo.getHostFor(partition).host);
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
 
         String newHost = host + "switch";
         int newPort = port + 1;
         addPartition(partition, newHost, newPort);
         brokerInfo = dynamicBrokersReader.getBrokerInfo();
-        assertEquals(newPort, brokerInfo.getHostFor(partition).port);
-        assertEquals(newHost, brokerInfo.getHostFor(partition).host);
+        assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
     }
 }