You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/18 16:59:16 UTC
kafka git commit: KAFKA-6051;
Close the ReplicaFetcherBlockingSend earlier on shutdown
Repository: kafka
Updated Branches:
refs/heads/trunk b71ee043f -> 5c1a85caa
KAFKA-6051; Close the ReplicaFetcherBlockingSend earlier on shutdown
Rearranged the testAddPartitionDuringDeleteTopic() test to keep the
likelyhood of the race condition.
Author: Maytee Chinavanichkit <ma...@linecorp.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #4056 from mayt/KAFKA-6051
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c1a85ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c1a85ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c1a85ca
Branch: refs/heads/trunk
Commit: 5c1a85caa074267fd84a2fed3662cbe1af93738d
Parents: b71ee04
Author: Maytee Chinavanichkit <ma...@linecorp.com>
Authored: Wed Oct 18 09:59:13 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 18 09:59:13 2017 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 9 ++++++---
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 3 ++-
2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c1a85ca/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index a8acde4..3bc68da 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -79,9 +79,12 @@ class ReplicaFetcherThread(name: String,
private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get)
- override def shutdown(): Unit = {
- super.shutdown()
- leaderEndpoint.close()
+ override def initiateShutdown(): Boolean = {
+ val justShutdown = super.initiateShutdown()
+ if (justShutdown) {
+ leaderEndpoint.close()
+ }
+ justShutdown
}
// process fetched data
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c1a85ca/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 50fa21e..1ca5500 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -178,11 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
servers = createTestTopicAndCluster(topic)
+ val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
// add partitions to topic
val newPartition = new TopicPartition(topic, 1)
- AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+ AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// verify that new partition doesn't exist on any broker either