You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/06 11:53:52 UTC

kafka git commit: KAFKA-4472; offsetRetentionMs miscalculated in GroupCoordinator

Repository: kafka
Updated Branches:
  refs/heads/trunk 34aa538bf -> f4f0f8222


KAFKA-4472; offsetRetentionMs miscalculated in GroupCoordinator

Fix possible integer overflow.

Author: Kim Christensen <ki...@mvno.dk>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2200 from kichristensen/MiscalculatedOffsetRetention


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4f0f822
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4f0f822
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4f0f822

Branch: refs/heads/trunk
Commit: f4f0f8222276b630b61355b30827bf1b50160712
Parents: 34aa538
Author: Kim Christensen <ki...@mvno.dk>
Authored: Tue Dec 6 11:15:58 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Dec 6 11:26:12 2016 +0000

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    | 24 +++++++------
 .../coordinator/GroupCoordinatorTest.scala      | 36 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4f0f822/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index eb479d5..0c53345 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -776,22 +776,26 @@ object GroupCoordinator {
     apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
   }
 
+  private[coordinator] def offsetConfig(config: KafkaConfig) = OffsetConfig(
+    maxMetadataSize = config.offsetMetadataMaxSize,
+    loadBufferSize = config.offsetsLoadBufferSize,
+    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
+    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
+    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
+    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+  )
+
   def apply(config: KafkaConfig,
             zkUtils: ZkUtils,
             replicaManager: ReplicaManager,
             heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
             joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
             time: Time): GroupCoordinator = {
-    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    val offsetConfig = this.offsetConfig(config)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4f0f822/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
new file mode 100644
index 0000000..8221d1e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
@@ -0,0 +1,36 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.Test
+import org.junit.Assert._
+
+class GroupCoordinatorTest {
+
+  @Test
+  def testOffsetsRetentionMsIntegerOverflow() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)
+    val config = KafkaConfig.fromProps(props)
+    val offsetConfig = GroupCoordinator.offsetConfig(config)
+    assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L)
+  }
+
+}