You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/09 19:20:21 UTC

kafka git commit: HOTFIX: bug updating cache when loading group metadata

Repository: kafka
Updated Branches:
  refs/heads/trunk e9fc7b8c8 -> 2b04004de


HOTFIX: bug updating cache when loading group metadata

The bug causes only the first instance of group metadata in the topic to be written to the cache (because of the putIfNotExists in addGroup). Coordinator fail-over won't work properly unless the cache is loaded with the right metadata.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #462 from hachikuji/hotfix-group-loading


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b04004d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b04004d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b04004d

Branch: refs/heads/trunk
Commit: 2b04004de878823fe631af1f3f85108c0b38caec
Parents: e9fc7b8
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Nov 9 10:26:17 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 9 10:26:17 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupMetadataManager.scala      | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b04004d/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index f6b8103..f98fc74 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -125,6 +125,13 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   /**
+   * Update the current cached metadata for the group with the given groupId or add the group if there is none.
+   */
+  private def updateGroup(groupId: String, group: GroupMetadata) {
+    groupsCache.put(groupId, group)
+  }
+
+  /**
    * Remove all metadata associated with the group, note this function needs to be
    * called inside the group lock
    * @param group
@@ -401,9 +408,10 @@ class GroupMetadataManager(val brokerId: Int,
                     // load group metadata
                     val groupId = baseKey.key.asInstanceOf[String]
                     val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
-
-                    if (groupMetadata != null)
-                      addGroup(groupId, groupMetadata)
+                    if (groupMetadata != null) {
+                      trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
+                      updateGroup(groupId, groupMetadata)
+                    }
                   }
 
                   currOffset = msgAndOffset.nextOffset