You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/16 22:03:18 UTC

kafka git commit: MINOR: Follow-up from KAFKA-2720 with comment/style fixes

Repository: kafka
Updated Branches:
  refs/heads/trunk 7d9d1cb23 -> f62db5dd8


MINOR: Follow-up from KAFKA-2720 with comment/style fixes

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1513 from hachikuji/followup-for-kafka-2720


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f62db5dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f62db5dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f62db5dd

Branch: refs/heads/trunk
Commit: f62db5dd8886f467e1de7e534a9ef3a839a08e84
Parents: 7d9d1cb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 17 00:02:57 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 17 00:02:57 2016 +0200

----------------------------------------------------------------------
 .../src/main/scala/kafka/coordinator/GroupCoordinator.scala | 8 +++++---
 .../main/scala/kafka/coordinator/GroupMetadataManager.scala | 9 ++-------
 2 files changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 9c75f83..0d02a4c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -666,11 +666,13 @@ class GroupCoordinator(val brokerId: Int,
         group.initNextGeneration()
         if (group.is(Empty)) {
           info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+
           delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => {
             if (errorCode != Errors.NONE.code) {
-              // we failed to persist the empty group. if we don't retry (which is how
-              // we handle the situation when a normal rebalance fails, then a coordinator
-              // change will cause the old generation to come back to life.
+              // we failed to write the empty group metadata. If the broker fails before another rebalance,
+              // the previous generation written to the log will become active again (and most likely timeout).
+              // This should be safe since there are no active members in an empty generation, so we just warn.
+              warn(s"Failed to write empty metadata for group ${group.groupId}: ${Errors.forCode(errorCode).message()}")
             }
           }))
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 915c360..e75e23b 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -515,8 +515,8 @@ class GroupMetadataManager(val brokerId: Int,
   /**
    * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
    * that partition.
-    *
-    * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+   *
+   * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
@@ -532,11 +532,6 @@ class GroupMetadataManager(val brokerId: Int,
         // to prevent coordinator's check-and-get-group race condition
         ownedPartitions.remove(offsetsPartition)
 
-        /**
-         * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
-         * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
-         * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
-         */
         for (group <- groupMetadataCache.values) {
           if (partitionFor(group.groupId) == offsetsPartition) {
             onGroupUnloaded(group)