You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/17 11:48:13 UTC

kafka git commit: KAFKA-2757; Consolidate BrokerEndPoint and EndPoint

Repository: kafka
Updated Branches:
  refs/heads/trunk 2faf9f60c -> 3382b6db7


KAFKA-2757; Consolidate BrokerEndPoint and EndPoint

Author: zhuchen1018 <am...@gmail.com>

Reviewers: Dong Lin <li...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #911 from zhuchen1018/KAFKA-2757


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3382b6db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3382b6db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3382b6db

Branch: refs/heads/trunk
Commit: 3382b6db7b2b9c17a4ccfd9ebe840741bcf44670
Parents: 2faf9f6
Author: Chen Zhu <am...@gmail.com>
Authored: Wed Feb 17 18:48:00 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 17 18:48:00 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kafka/common/BrokerEndPoint.java | 88 ++++++++++++++++++++
 .../common/requests/LeaderAndIsrRequest.java    | 31 +++----
 .../common/requests/UpdateMetadataRequest.java  | 18 +---
 .../common/requests/RequestResponseTest.java    | 13 +--
 .../controller/ControllerChannelManager.scala   |  6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  4 +-
 7 files changed, 115 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
new file mode 100644
index 0000000..d5275c4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+
+/**
+ * Broker id, host and port
+ */
+public final class BrokerEndPoint implements Serializable {
+
+    private int hash = 0;
+    private final int id;
+    private final String host;
+    private final int port;
+
+    public BrokerEndPoint(int id, String host, int port) {
+        this.id = id;
+        this.host = host;
+        this.port = port;
+    }
+
+    public int id() {
+        return id;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + id;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + port;
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        BrokerEndPoint other = (BrokerEndPoint) obj;
+        if (id != other.id)
+            return false;
+        if (port != other.port)
+            return false;
+        if (host == null) {
+            if (other.host != null)
+                return false;
+        } else if (!host.equals(other.host))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + id + ", " + host + ":" + port + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index a77a7cb..264af90 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.BrokerEndPoint;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -53,18 +54,6 @@ public class LeaderAndIsrRequest extends AbstractRequest {
 
     }
 
-    public static final class EndPoint {
-        public final int id;
-        public final String host;
-        public final int port;
-
-        public EndPoint(int id, String host, int port) {
-            this.id = id;
-            this.host = host;
-            this.port = port;
-        }
-    }
-
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
 
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
@@ -89,10 +78,10 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private final int controllerId;
     private final int controllerEpoch;
     private final Map<TopicPartition, PartitionState> partitionStates;
-    private final Set<EndPoint> liveLeaders;
+    private final Set<BrokerEndPoint> liveLeaders;
 
     public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
-                               Set<EndPoint> liveLeaders) {
+                               Set<BrokerEndPoint> liveLeaders) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
@@ -115,11 +104,11 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
 
         List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
-        for (EndPoint leader : liveLeaders) {
+        for (BrokerEndPoint leader : liveLeaders) {
             Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
-            leaderData.set(END_POINT_ID_KEY_NAME, leader.id);
-            leaderData.set(HOST_KEY_NAME, leader.host);
-            leaderData.set(PORT_KEY_NAME, leader.port);
+            leaderData.set(END_POINT_ID_KEY_NAME, leader.id());
+            leaderData.set(HOST_KEY_NAME, leader.host());
+            leaderData.set(PORT_KEY_NAME, leader.port());
             leadersData.add(leaderData);
         }
         struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
@@ -159,13 +148,13 @@ public class LeaderAndIsrRequest extends AbstractRequest {
 
         }
 
-        Set<EndPoint> leaders = new HashSet<>();
+        Set<BrokerEndPoint> leaders = new HashSet<>();
         for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) {
             Struct leadersData = (Struct) leadersDataObj;
             int id = leadersData.getInt(END_POINT_ID_KEY_NAME);
             String host = leadersData.getString(HOST_KEY_NAME);
             int port = leadersData.getInt(PORT_KEY_NAME);
-            leaders.add(new EndPoint(id, host, port));
+            leaders.add(new BrokerEndPoint(id, host, port));
         }
 
         controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
@@ -202,7 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         return partitionStates;
     }
 
-    public Set<EndPoint> liveLeaders() {
+    public Set<BrokerEndPoint> liveLeaders() {
         return liveLeaders;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 808161c..d8d8013 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -13,6 +13,7 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.BrokerEndPoint;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -71,19 +72,6 @@ public class UpdateMetadataRequest extends AbstractRequest {
         }
     }
 
-    @Deprecated
-    public static final class BrokerEndPoint {
-        public final int id;
-        public final String host;
-        public final int port;
-
-        public BrokerEndPoint(int id, String host, int port) {
-            this.id = id;
-            this.host = host;
-            this.port = port;
-        }
-    }
-
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id);
 
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
@@ -128,8 +116,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
         Set<Broker> brokers = new HashSet<>(brokerEndPoints.size());
         for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
             Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
-                    new EndPoint(brokerEndPoint.host, brokerEndPoint.port));
-            brokers.add(new Broker(brokerEndPoint.id, endPoints));
+                    new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
+            brokers.add(new Broker(brokerEndPoint.id(), endPoints));
         }
         return brokers;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index db9c81a..5fc5551 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -13,6 +13,7 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.BrokerEndPoint;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -353,9 +354,9 @@ public class RequestResponseTest {
         partitionStates.put(new TopicPartition("topic20", 1),
                 new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
-        Set<LeaderAndIsrRequest.EndPoint> leaders = new HashSet<>(Arrays.asList(
-                new LeaderAndIsrRequest.EndPoint(0, "test0", 1223),
-                new LeaderAndIsrRequest.EndPoint(1, "test1", 1223)
+        Set<BrokerEndPoint> leaders = new HashSet<>(Arrays.asList(
+                new BrokerEndPoint(0, "test0", 1223),
+                new BrokerEndPoint(1, "test1", 1223)
         ));
 
         return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
@@ -379,9 +380,9 @@ public class RequestResponseTest {
                 new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
         if (version == 0) {
-            Set<UpdateMetadataRequest.BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
-                    new UpdateMetadataRequest.BrokerEndPoint(0, "host1", 1223),
-                    new UpdateMetadataRequest.BrokerEndPoint(1, "host2", 1234)
+            Set<BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
+                    new BrokerEndPoint(0, "host1", 1223),
+                    new BrokerEndPoint(1, "host2", 1234)
             ));
 
             return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e52a9d3..02ba814 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,7 @@ package kafka.controller
 import kafka.api.{LeaderAndIsr, KAFKA_090, PartitionStateInfo}
 import kafka.utils._
 import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
 import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
@@ -352,7 +352,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { b =>
           val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)
-          new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+          new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
         }
         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -386,7 +386,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           if (version == 0) {
             val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
               val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
-              new UpdateMetadataRequest.BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+              new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
             }
             new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index db2040f..33027e7 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{TopicPartition, requests}
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 
@@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   private def createLeaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
       Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
-      Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava)
+      Set(new BrokerEndPoint(brokerId,"localhost", 0)).asJava)
   }
 
   private def createStopReplicaRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 704f776..94013bc 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
 import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 
 import scala.collection.JavaConverters._
@@ -132,7 +132,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
     val brokerEndPoints = brokers.map { b =>
       val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
-      new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+      new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
     }
 
     val controllerContext = new ControllerContext(zkUtils, 6000)