You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/17 11:48:13 UTC
kafka git commit: KAFKA-2757; Consolidate BrokerEndPoint and EndPoint
Repository: kafka
Updated Branches:
refs/heads/trunk 2faf9f60c -> 3382b6db7
KAFKA-2757; Consolidate BrokerEndPoint and EndPoint
Author: zhuchen1018 <am...@gmail.com>
Reviewers: Dong Lin <li...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #911 from zhuchen1018/KAFKA-2757
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3382b6db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3382b6db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3382b6db
Branch: refs/heads/trunk
Commit: 3382b6db7b2b9c17a4ccfd9ebe840741bcf44670
Parents: 2faf9f6
Author: Chen Zhu <am...@gmail.com>
Authored: Wed Feb 17 18:48:00 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 17 18:48:00 2016 +0800
----------------------------------------------------------------------
.../org/apache/kafka/common/BrokerEndPoint.java | 88 ++++++++++++++++++++
.../common/requests/LeaderAndIsrRequest.java | 31 +++----
.../common/requests/UpdateMetadataRequest.java | 18 +---
.../common/requests/RequestResponseTest.java | 13 +--
.../controller/ControllerChannelManager.scala | 6 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +-
.../unit/kafka/server/LeaderElectionTest.scala | 4 +-
7 files changed, 115 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
new file mode 100644
index 0000000..d5275c4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+
+/**
+ * Broker id, host and port
+ */
+public final class BrokerEndPoint implements Serializable {
+
+ private int hash = 0;
+ private final int id;
+ private final String host;
+ private final int port;
+
+ public BrokerEndPoint(int id, String host, int port) {
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash != 0)
+ return hash;
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + id;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ this.hash = result;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ BrokerEndPoint other = (BrokerEndPoint) obj;
+ if (id != other.id)
+ return false;
+ if (port != other.port)
+ return false;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + id + ", " + host + ":" + port + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index a77a7cb..264af90 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.BrokerEndPoint;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -53,18 +54,6 @@ public class LeaderAndIsrRequest extends AbstractRequest {
}
- public static final class EndPoint {
- public final int id;
- public final String host;
- public final int port;
-
- public EndPoint(int id, String host, int port) {
- this.id = id;
- this.host = host;
- this.port = port;
- }
- }
-
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
@@ -89,10 +78,10 @@ public class LeaderAndIsrRequest extends AbstractRequest {
private final int controllerId;
private final int controllerEpoch;
private final Map<TopicPartition, PartitionState> partitionStates;
- private final Set<EndPoint> liveLeaders;
+ private final Set<BrokerEndPoint> liveLeaders;
public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
- Set<EndPoint> liveLeaders) {
+ Set<BrokerEndPoint> liveLeaders) {
super(new Struct(CURRENT_SCHEMA));
struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
@@ -115,11 +104,11 @@ public class LeaderAndIsrRequest extends AbstractRequest {
struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
- for (EndPoint leader : liveLeaders) {
+ for (BrokerEndPoint leader : liveLeaders) {
Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
- leaderData.set(END_POINT_ID_KEY_NAME, leader.id);
- leaderData.set(HOST_KEY_NAME, leader.host);
- leaderData.set(PORT_KEY_NAME, leader.port);
+ leaderData.set(END_POINT_ID_KEY_NAME, leader.id());
+ leaderData.set(HOST_KEY_NAME, leader.host());
+ leaderData.set(PORT_KEY_NAME, leader.port());
leadersData.add(leaderData);
}
struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
@@ -159,13 +148,13 @@ public class LeaderAndIsrRequest extends AbstractRequest {
}
- Set<EndPoint> leaders = new HashSet<>();
+ Set<BrokerEndPoint> leaders = new HashSet<>();
for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) {
Struct leadersData = (Struct) leadersDataObj;
int id = leadersData.getInt(END_POINT_ID_KEY_NAME);
String host = leadersData.getString(HOST_KEY_NAME);
int port = leadersData.getInt(PORT_KEY_NAME);
- leaders.add(new EndPoint(id, host, port));
+ leaders.add(new BrokerEndPoint(id, host, port));
}
controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
@@ -202,7 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
return partitionStates;
}
- public Set<EndPoint> liveLeaders() {
+ public Set<BrokerEndPoint> liveLeaders() {
return liveLeaders;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 808161c..d8d8013 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -13,6 +13,7 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.BrokerEndPoint;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -71,19 +72,6 @@ public class UpdateMetadataRequest extends AbstractRequest {
}
}
- @Deprecated
- public static final class BrokerEndPoint {
- public final int id;
- public final String host;
- public final int port;
-
- public BrokerEndPoint(int id, String host, int port) {
- this.id = id;
- this.host = host;
- this.port = port;
- }
- }
-
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id);
private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
@@ -128,8 +116,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
Set<Broker> brokers = new HashSet<>(brokerEndPoints.size());
for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
- new EndPoint(brokerEndPoint.host, brokerEndPoint.port));
- brokers.add(new Broker(brokerEndPoint.id, endPoints));
+ new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
+ brokers.add(new Broker(brokerEndPoint.id(), endPoints));
}
return brokers;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index db9c81a..5fc5551 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -13,6 +13,7 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.BrokerEndPoint;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -353,9 +354,9 @@ public class RequestResponseTest {
partitionStates.put(new TopicPartition("topic20", 1),
new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
- Set<LeaderAndIsrRequest.EndPoint> leaders = new HashSet<>(Arrays.asList(
- new LeaderAndIsrRequest.EndPoint(0, "test0", 1223),
- new LeaderAndIsrRequest.EndPoint(1, "test1", 1223)
+ Set<BrokerEndPoint> leaders = new HashSet<>(Arrays.asList(
+ new BrokerEndPoint(0, "test0", 1223),
+ new BrokerEndPoint(1, "test1", 1223)
));
return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
@@ -379,9 +380,9 @@ public class RequestResponseTest {
new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
if (version == 0) {
- Set<UpdateMetadataRequest.BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
- new UpdateMetadataRequest.BrokerEndPoint(0, "host1", 1223),
- new UpdateMetadataRequest.BrokerEndPoint(1, "host2", 1234)
+ Set<BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
+ new BrokerEndPoint(0, "host1", 1223),
+ new BrokerEndPoint(1, "host2", 1234)
));
return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e52a9d3..02ba814 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,7 @@ package kafka.controller
import kafka.api.{LeaderAndIsr, KAFKA_090, PartitionStateInfo}
import kafka.utils._
import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
@@ -352,7 +352,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { b =>
val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)
- new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+ new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -386,7 +386,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
if (version == 0) {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
- new UpdateMetadataRequest.BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+ new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
}
new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index db2040f..33027e7 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{TopicPartition, requests}
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
private def createLeaderAndIsrRequest = {
new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
- Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava)
+ Set(new BrokerEndPoint(brokerId,"localhost", 0)).asJava)
}
private def createStopReplicaRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 704f776..94013bc 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import scala.collection.JavaConverters._
@@ -132,7 +132,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
val brokerEndPoints = brokers.map { b =>
val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
- new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+ new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
}
val controllerContext = new ControllerContext(zkUtils, 6000)