You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/16 22:03:18 UTC
kafka git commit: MINOR: Follow-up from KAFKA-2720 with comment/style
fixes
Repository: kafka
Updated Branches:
refs/heads/trunk 7d9d1cb23 -> f62db5dd8
MINOR: Follow-up from KAFKA-2720 with comment/style fixes
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1513 from hachikuji/followup-for-kafka-2720
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f62db5dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f62db5dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f62db5dd
Branch: refs/heads/trunk
Commit: f62db5dd8886f467e1de7e534a9ef3a839a08e84
Parents: 7d9d1cb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 17 00:02:57 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 17 00:02:57 2016 +0200
----------------------------------------------------------------------
.../src/main/scala/kafka/coordinator/GroupCoordinator.scala | 8 +++++---
.../main/scala/kafka/coordinator/GroupMetadataManager.scala | 9 ++-------
2 files changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 9c75f83..0d02a4c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -666,11 +666,13 @@ class GroupCoordinator(val brokerId: Int,
group.initNextGeneration()
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+
delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => {
if (errorCode != Errors.NONE.code) {
- // we failed to persist the empty group. if we don't retry (which is how
- // we handle the situation when a normal rebalance fails, then a coordinator
- // change will cause the old generation to come back to life.
+ // we failed to write the empty group metadata. If the broker fails before another rebalance,
+ // the previous generation written to the log will become active again (and most likely timeout).
+ // This should be safe since there are no active members in an empty generation, so we just warn.
+ warn(s"Failed to write empty metadata for group ${group.groupId}: ${Errors.forCode(errorCode).message()}")
}
}))
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 915c360..e75e23b 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -515,8 +515,8 @@ class GroupMetadataManager(val brokerId: Int,
/**
* When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
* that partition.
- *
- * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+ *
+ * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
@@ -532,11 +532,6 @@ class GroupMetadataManager(val brokerId: Int,
// to prevent coordinator's check-and-get-group race condition
ownedPartitions.remove(offsetsPartition)
- /**
- * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
- * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
- * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
- */
for (group <- groupMetadataCache.values) {
if (partitionFor(group.groupId) == offsetsPartition) {
onGroupUnloaded(group)