You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/12 20:02:18 UTC

[kafka] branch 2.5 updated (93a4820 -> 8e6d24a)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 93a4820  KAFKA-9423: Refine layout of configuration options on website and make individual settings directly linkable (#7955)
     new a736360  MINOR: Fix unnecessary metadata fetch before group assignment (#8095)
     new 8e6d24a  KAFKA-9499; Improve deletion process by batching more aggressively (#8053)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../consumer/ConsumerPartitionAssignor.java        |   8 ++
 .../consumer/internals/ConsumerCoordinator.java    |   3 -
 .../consumer/internals/SubscriptionState.java      |  12 ++-
 .../consumer/internals/SubscriptionStateTest.java  |  20 +++-
 .../scala/kafka/controller/ControllerContext.scala |  10 +-
 .../kafka/controller/TopicDeletionManager.scala    | 112 +++++++++++----------
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  14 ++-
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala |  33 ++++--
 .../kafka/controller/ControllerContextTest.scala   |  15 ++-
 .../controller/MockPartitionStateMachine.scala     |  12 +++
 .../kafka/controller/MockReplicaStateMachine.scala |  12 +++
 .../controller/TopicDeletionManagerTest.scala      |  28 +++++-
 12 files changed, 181 insertions(+), 98 deletions(-)


[kafka] 02/02: KAFKA-9499; Improve deletion process by batching more aggressively (#8053)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8e6d24a50fd81612b7442c8728bb29b4b6b19418
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Feb 12 20:46:54 2020 +0100

    KAFKA-9499; Improve deletion process by batching more aggressively (#8053)
    
    This PR speeds up the deletion process by doing the following:
    - Batch whenever possible to minimize the number of requests sent out to other brokers;
    - Refactor `onPartitionDeletion` to remove the usage of `allLiveReplicas`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/controller/ControllerContext.scala |  10 +-
 .../kafka/controller/TopicDeletionManager.scala    | 112 +++++++++++----------
 .../kafka/controller/ControllerContextTest.scala   |  15 ++-
 .../controller/MockPartitionStateMachine.scala     |  12 +++
 .../kafka/controller/MockReplicaStateMachine.scala |  12 +++
 .../controller/TopicDeletionManagerTest.scala      |  28 +++++-
 6 files changed, 114 insertions(+), 75 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 47cb553..f7a6cdd 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -221,7 +221,9 @@ class ControllerContext {
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
-      case (partition, assignment) => assignment.replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
+      case (partition, assignment) => assignment.replicas.map { r =>
+        PartitionAndReplica(new TopicPartition(topic, partition), r)
+      }
     }.toSet
   }
 
@@ -231,12 +233,6 @@ class ControllerContext {
     }.toSet
   }
 
-  def allLiveReplicas(): Set[PartitionAndReplica] = {
-    replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
-      isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition)
-    }
-  }
-
   /**
     * Get all online and offline replicas.
     *
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index d032b3b..64f9ff0 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Set
+import scala.collection.mutable
 
 trait DeletionClient {
   def deleteTopic(topic: String, epochZkVersion: Int): Unit
@@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig,
   /**
    * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
    * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
-   *@param topic Topic for which deletion should be retried
+   * @param topics Topics for which deletion should be retried
    */
-  private def retryDeletionForIneligibleReplicas(topic: String): Unit = {
+  private def retryDeletionForIneligibleReplicas(topics: Set[String]): Unit = {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
-    val failedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionIneligible)
-    info(s"Retrying deletion of topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
+    val failedReplicas = topics.flatMap(controllerContext.replicasInState(_, ReplicaDeletionIneligible))
+    debug(s"Retrying deletion of topics ${topics.mkString(",")} since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
     replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
   }
 
@@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig,
    * removed from their caches.
    */
   private def onTopicDeletion(topics: Set[String]): Unit = {
-    info(s"Topic deletion callback for ${topics.mkString(",")}")
-    // send update metadata so that brokers stop serving data for topics to be deleted
-    val partitions = topics.flatMap(controllerContext.partitionsForTopic)
     val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted
     if (unseenTopicsForDeletion.nonEmpty) {
       val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
@@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig,
       controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
     }
 
-    client.sendMetadataUpdate(partitions)
-    topics.foreach { topic =>
-      onPartitionDeletion(controllerContext.partitionsForTopic(topic))
-    }
-  }
+    // send update metadata so that brokers stop serving data for topics to be deleted
+    client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))
 
-  /**
-   * Invoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending
-   * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
-   * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
-   * is never retried. A topic is removed from the in progress list when
-   * 1. Either the topic is successfully deleted OR
-   * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
-   * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
-   * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
-   * the replicas a StopReplicaRequest (delete=true)
-   * This method does the following things -
-   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
-   *    for deletion if some replicas are dead since it won't complete successfully anyway
-   * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
-   * @param replicasForTopicsToBeDeleted
-   */
-  private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]): Unit = {
-    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
-      val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
-      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
-      val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
-      val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
-      // move dead replicas directly to failed state
-      replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
-      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
-      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
-      debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}")
-      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted)
-      if (deadReplicasForTopic.nonEmpty) {
-        debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
-        markTopicIneligibleForDeletion(Set(topic), reason = "offline replicas")
-      }
-    }
+    onPartitionDeletion(topics)
   }
 
   /**
    * Invoked by onTopicDeletion with the list of partitions for topics to be deleted
    * It does the following -
-   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
-   *    deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
+   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
+   *    for deletion if some replicas are dead since it won't complete successfully anyway
    * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
    *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
    *    it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
    * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
    *    will delete all persistent data from all replicas of the respective partitions
    */
-  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]): Unit = {
-    info(s"Partition deletion callback for ${partitionsToBeDeleted.mkString(",")}")
-    val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
-    startReplicaDeletion(replicasPerPartition)
+  private def onPartitionDeletion(topicsToBeDeleted: Set[String]): Unit = {
+    val allDeadReplicas = mutable.ListBuffer.empty[PartitionAndReplica]
+    val allReplicasForDeletionRetry = mutable.ListBuffer.empty[PartitionAndReplica]
+    val allTopicsIneligibleForDeletion = mutable.Set.empty[String]
+
+    topicsToBeDeleted.foreach { topic =>
+      val (aliveReplicas, deadReplicas) = controllerContext.replicasForTopic(topic).partition { r =>
+        controllerContext.isReplicaOnline(r.replica, r.topicPartition)
+      }
+
+      val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
+      val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas
+
+      allDeadReplicas ++= deadReplicas
+      allReplicasForDeletionRetry ++= replicasForDeletionRetry
+
+      if (deadReplicas.nonEmpty) {
+        debug(s"Dead Replicas (${deadReplicas.mkString(",")}) found for topic $topic")
+        allTopicsIneligibleForDeletion += topic
+      }
+    }
+
+    // move dead replicas directly to failed state
+    replicaStateMachine.handleStateChanges(allDeadReplicas, ReplicaDeletionIneligible)
+    // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
+    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, OfflineReplica)
+    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, ReplicaDeletionStarted)
+
+    if (allTopicsIneligibleForDeletion.nonEmpty) {
+      markTopicIneligibleForDeletion(allTopicsIneligibleForDeletion, reason = "offline replicas")
+    }
   }
 
   private def resumeDeletions(): Unit = {
     val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
+    val topicsEligibleForRetry = mutable.Set.empty[String]
+    val topicsEligibleForDeletion = mutable.Set.empty[String]
+
     if (topicsQueuedForDeletion.nonEmpty)
       info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
 
@@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig,
         // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
         // or there is at least one failed replica (which means topic deletion should be retried).
         if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
-          retryDeletionForIneligibleReplicas(topic)
+          topicsEligibleForRetry += topic
         }
       }
 
-      // Try delete topic if it is eligible for deletion.
+      // Add topic to the eligible set if it is eligible for deletion.
       if (isTopicEligibleForDeletion(topic)) {
         info(s"Deletion of topic $topic (re)started")
-        // topic deletion will be kicked off
-        onTopicDeletion(Set(topic))
+        topicsEligibleForDeletion += topic
       }
     }
+
+    // topic deletion retry will be kicked off
+    if (topicsEligibleForRetry.nonEmpty) {
+      retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
+    }
+
+    // topic deletion will be kicked off
+    if (topicsEligibleForDeletion.nonEmpty) {
+      onTopicDeletion(topicsEligibleForDeletion)
+    }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index fd8d3e7..39023fa 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -18,6 +18,7 @@
 package unit.kafka.controller
 
 import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.PartitionAndReplica
 import kafka.controller.{ControllerContext, ReplicaAssignment}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
@@ -50,14 +51,12 @@ class ControllerContextTest {
 
     // Simple round-robin replica assignment
     var leaderIndex = 0
-    Seq(tp1, tp2, tp3).foreach {
-      partition =>
-        val replicas = brokers.indices.map { i =>
-          val replica = brokers((i + leaderIndex) % brokers.size)
-          replica
-        }
-        context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
-        leaderIndex += 1
+    Seq(tp1, tp2, tp3).foreach { partition =>
+      val replicas = brokers.indices.map { i =>
+        brokers((i + leaderIndex) % brokers.size)
+      }
+      context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
+      leaderIndex += 1
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 0c6c00d..b29a3d9 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -27,11 +27,23 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
                                 uncleanLeaderElectionEnabled: Boolean)
   extends PartitionStateMachine(controllerContext) {
 
+  var stateChangesByTargetState = mutable.Map.empty[PartitionState, Int].withDefaultValue(0)
+
+  def stateChangesCalls(targetState: PartitionState): Int = {
+    stateChangesByTargetState(targetState)
+  }
+
+  def clear(): Unit = {
+    stateChangesByTargetState.clear()
+  }
+
   override def handleStateChanges(
     partitions: Seq[TopicPartition],
     targetState: PartitionState,
     leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
+    stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1
+
     partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
     val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
     if (invalidPartitions.nonEmpty) {
diff --git a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
index e5207bf..32bfc50 100644
--- a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
@@ -17,10 +17,22 @@
 package kafka.controller
 
 import scala.collection.Seq
+import scala.collection.mutable
 
 class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) {
+  val stateChangesByTargetState = mutable.Map.empty[ReplicaState, Int].withDefaultValue(0)
+
+  def stateChangesCalls(targetState: ReplicaState): Int = {
+    stateChangesByTargetState(targetState)
+  }
+
+  def clear(): Unit = {
+    stateChangesByTargetState.clear()
+  }
 
   override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
+    stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1
+
     replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
     val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
     if (invalidReplicas.nonEmpty) {
diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
index 33479c1..b1b8c24 100644
--- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
@@ -76,25 +76,43 @@ class TopicDeletionManagerTest {
 
     val fooPartitions = controllerContext.partitionsForTopic("foo")
     val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet
+    val barPartitions = controllerContext.partitionsForTopic("bar")
+    val barReplicas = controllerContext.replicasForPartition(barPartitions).toSet
+
+    // Clean up state changes before starting the deletion
+    replicaStateMachine.clear()
+    partitionStateMachine.clear()
 
     // Queue the topic for deletion
-    deletionManager.enqueueTopicsForDeletion(Set("foo"))
+    deletionManager.enqueueTopicsForDeletion(Set("foo", "bar"))
 
     assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
     assertEquals(fooReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
-    verify(deletionClient).sendMetadataUpdate(fooPartitions)
-    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
-    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(barPartitions, controllerContext.partitionsInState("bar", NonExistentPartition))
+    assertEquals(barReplicas, controllerContext.replicasInState("bar", ReplicaDeletionStarted))
+    verify(deletionClient).sendMetadataUpdate(fooPartitions ++ barPartitions)
+    assertEquals(Set("foo", "bar"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo", "bar"), controllerContext.topicsWithDeletionStarted)
     assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
 
     // Complete the deletion
-    deletionManager.completeReplicaDeletion(fooReplicas)
+    deletionManager.completeReplicaDeletion(fooReplicas ++ barReplicas)
 
     assertEquals(Set.empty, controllerContext.partitionsForTopic("foo"))
     assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "foo"))
+    assertEquals(Set.empty, controllerContext.partitionsForTopic("bar"))
+    assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "bar"))
     assertEquals(Set(), controllerContext.topicsToBeDeleted)
     assertEquals(Set(), controllerContext.topicsWithDeletionStarted)
     assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+
+    assertEquals(1, partitionStateMachine.stateChangesCalls(OfflinePartition))
+    assertEquals(1, partitionStateMachine.stateChangesCalls(NonExistentPartition))
+
+    assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionIneligible))
+    assertEquals(1, replicaStateMachine.stateChangesCalls(OfflineReplica))
+    assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionStarted))
+    assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionSuccessful))
   }
 
   @Test


[kafka] 01/02: MINOR: Fix unnecessary metadata fetch before group assignment (#8095)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a73636070a00e8bd7038f10a000db2fd4985cf5f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 12 11:45:06 2020 -0800

    MINOR: Fix unnecessary metadata fetch before group assignment (#8095)
    
    The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscri [...]
    
    Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../consumer/ConsumerPartitionAssignor.java        |  8 ++++++
 .../consumer/internals/ConsumerCoordinator.java    |  3 --
 .../consumer/internals/SubscriptionState.java      | 12 ++++----
 .../consumer/internals/SubscriptionStateTest.java  | 20 +++++++++++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 14 +++++----
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 33 ++++++++++++++++------
 6 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index f9a4217..8708ea4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -154,6 +154,14 @@ public interface ConsumerPartitionAssignor {
         public ByteBuffer userData() {
             return userData;
         }
+
+        @Override
+        public String toString() {
+            return "Assignment(" +
+                    "partitions=" + partitions +
+                    (userData == null ? "" : ", userDataSize=" + userData.remaining()) +
+                    ')';
+        }
     }
 
     final class GroupSubscription {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 94209d2..999921f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1370,9 +1370,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic);
         }
 
-        Map<String, Integer> partitionsPerTopic() {
-            return partitionsPerTopic;
-        }
     }
 
     private static class OffsetCommitCompletion {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9200eb8..6568c91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -186,15 +186,17 @@ public class SubscriptionState {
     }
 
     /**
-     * Add topics to the current group subscription. This is used by the group leader to ensure
+     * Set the current group subscription. This is used by the group leader to ensure
      * that it receives metadata updates for all topics that the group is interested in.
-     * @param topics The topics to add to the group subscription
+     *
+     * @param topics All topics from the group subscription
+     * @return true if the group subscription contains topics which are not part of the local subscription
      */
     synchronized boolean groupSubscribe(Collection<String> topics) {
         if (!hasAutoAssignedPartitions())
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
-        groupSubscription = new HashSet<>(groupSubscription);
-        return groupSubscription.addAll(topics);
+        groupSubscription = new HashSet<>(topics);
+        return !subscription.containsAll(groupSubscription);
     }
 
     /**
@@ -328,7 +330,7 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subcription topics for which metadata is required . For the leader, this will include
+     * Get the subscription topics for which metadata is required. For the leader, this will include
      * the union of the subscriptions of all group members. For followers, it is just that member's
      * subscription. This is used when querying topic metadata to detect the metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the group so that it
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 74048ad..96f08f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -106,13 +106,29 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void testGroupSubscribe() {
+        state.subscribe(singleton(topic1), rebalanceListener);
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1)));
+        assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics());
+
+        // `groupSubscribe` does not accumulate
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+    }
+
+    @Test
     public void partitionAssignmentChangeOnPatternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
@@ -244,7 +260,7 @@ public class SubscriptionStateTest {
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
-        state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
     }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 97b638f..853b2ca 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -65,20 +65,24 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   }
 
   def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = {
-    val props = new Properties
-    props.put("bootstrap.servers", brokerList)
-    props.put("group.id", group)
-    val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+    val consumer = createNoAutoCommitConsumer(group)
     try {
       val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
         .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}
-
       consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap
     } finally {
       consumer.close()
     }
   }
 
+  def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = {
+    val props = new Properties
+    props.put("bootstrap.servers", brokerList)
+    props.put("group.id", group)
+    props.put("enable.auto.commit", "false")
+    new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+  }
+
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
     val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 838444c..f9322b3 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,8 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
-import scala.collection.Seq
-
 import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
@@ -28,6 +26,9 @@ import org.apache.kafka.test
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
 class TimeConversionTests {
 
   @Test
@@ -462,12 +463,28 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     executor.shutdown()
   }
 
-  private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val offsets = committedOffsets(topic = topic, group = group).values
-      count == offsets.sum
-    }, "Expected that consumer group has consumed all messages from topic/partition. " +
-      s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
+  private def awaitConsumerProgress(topic: String = topic,
+                                    group: String = group,
+                                    count: Long): Unit = {
+    val consumer = createNoAutoCommitConsumer(group)
+    try {
+      val partitions = consumer.partitionsFor(topic).asScala.map { partitionInfo =>
+        new TopicPartition(partitionInfo.topic, partitionInfo.partition)
+      }.toSet
+
+      TestUtils.waitUntilTrue(() => {
+        val committed = consumer.committed(partitions.asJava).values.asScala
+        val total = committed.foldLeft(0L) { case (currentSum, offsetAndMetadata) =>
+          currentSum + Option(offsetAndMetadata).map(_.offset).getOrElse(0L)
+        }
+        total == count
+      }, "Expected that consumer group has consumed all messages from topic/partition. " +
+        s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
+
+    } finally {
+      consumer.close()
+    }
+
   }
 
   private def resetAndAssertOffsets(args: Array[String],