You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/22 12:39:01 UTC
kafka git commit: KAFKA-5289: handleStopReplica should not send a
second response
Repository: kafka
Updated Branches:
refs/heads/trunk d3f1407f7 -> ceb10c533
KAFKA-5289: handleStopReplica should not send a second response
`shutdownIdleFetcherThreads()` can throw InterruptedException
for example.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3096 from ijuma/kafka-5289-stop-replica-should-not-send-two-responses
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ceb10c53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ceb10c53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ceb10c53
Branch: refs/heads/trunk
Commit: ceb10c53302bd4824737e03ab86ca19e2aa64c7a
Parents: d3f1407
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 22 13:37:29 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Mon May 22 13:37:29 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ceb10c53/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1346fb3..197298c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,7 +35,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.{RequestChannel, RequestOrResponseSend}
import kafka.security.auth._
-import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -195,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
// is not cleared.
result.foreach { case (topicPartition, error) =>
- if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
+ if (error == Errors.NONE && stopReplicaRequest.deletePartitions && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
groupCoordinator.handleGroupEmigration(topicPartition.partition)
}
}
@@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
}
- replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
+ CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads())
}
def handleUpdateMetadataRequest(request: RequestChannel.Request) {