You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 22:00:58 UTC

kafka git commit: KAFKA-2795: fix potential NPE in GroupMetadataManager.addGroup

Repository: kafka
Updated Branches:
  refs/heads/trunk 1d884d1f6 -> c455e608c


KAFKA-2795: fix potential NPE in GroupMetadataManager.addGroup

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Onur Karaman, Guozhang Wang

Closes #488 from hachikuji/KAFKA-2795


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c455e608
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c455e608
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c455e608

Branch: refs/heads/trunk
Commit: c455e608c1f2c7be6ff0a721f49c1fe3ede0165f
Parents: 1d884d1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Nov 10 13:06:55 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 10 13:06:55 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupMetadataManager.scala     | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c455e608/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index f98fc74..047970e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -67,9 +67,6 @@ class GroupMetadataManager(val brokerId: Int,
   /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
   private val offsetExpireLock = new ReentrantReadWriteLock()
 
-  /* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */
-  private val offsetRemoveLock = new ReentrantReadWriteLock()
-
   /* shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
@@ -116,12 +113,12 @@ class GroupMetadataManager(val brokerId: Int,
    * Add a group or get the group associated with the given groupId if it already exists
    */
   def addGroup(groupId: String, protocolType: String): GroupMetadata = {
-    addGroup(groupId, new GroupMetadata(groupId, protocolType))
-  }
-
-  private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = {
-    groupsCache.putIfNotExists(groupId, group)
-    groupsCache.get(groupId)
+    val newGroup = new GroupMetadata(groupId, protocolType)
+    val currentGroup = groupsCache.putIfNotExists(groupId, newGroup)
+    if (currentGroup != null)
+      currentGroup
+    else
+      newGroup
   }
 
   /**