You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/18 16:59:16 UTC

kafka git commit: KAFKA-6051; Close the ReplicaFetcherBlockingSend earlier on shutdown

Repository: kafka
Updated Branches:
  refs/heads/trunk b71ee043f -> 5c1a85caa


KAFKA-6051; Close the ReplicaFetcherBlockingSend earlier on shutdown

Rearranged the testAddPartitionDuringDeleteTopic() test to keep the
likelyhood of the race condition.

Author: Maytee Chinavanichkit <ma...@linecorp.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #4056 from mayt/KAFKA-6051


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c1a85ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c1a85ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c1a85ca

Branch: refs/heads/trunk
Commit: 5c1a85caa074267fd84a2fed3662cbe1af93738d
Parents: b71ee04
Author: Maytee Chinavanichkit <ma...@linecorp.com>
Authored: Wed Oct 18 09:59:13 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 18 09:59:13 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 9 ++++++---
 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala  | 3 ++-
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c1a85ca/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index a8acde4..3bc68da 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -79,9 +79,12 @@ class ReplicaFetcherThread(name: String,
 
   private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
 
-  override def shutdown(): Unit = {
-    super.shutdown()
-    leaderEndpoint.close()
+  override def initiateShutdown(): Boolean = {
+    val justShutdown = super.initiateShutdown()
+    if (justShutdown) {
+      leaderEndpoint.close()
+    }
+    justShutdown
   }
 
   // process fetched data

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c1a85ca/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 50fa21e..1ca5500 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -178,11 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
+    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
     val newPartition = new TopicPartition(topic, 1)
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
       Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either