You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/06 11:53:52 UTC
kafka git commit: KAFKA-4472;
offsetRetentionMs miscalculated in GroupCoordinator
Repository: kafka
Updated Branches:
refs/heads/trunk 34aa538bf -> f4f0f8222
KAFKA-4472; offsetRetentionMs miscalculated in GroupCoordinator
Fix possible integer overflow.
Author: Kim Christensen <ki...@mvno.dk>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2200 from kichristensen/MiscalculatedOffsetRetention
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4f0f822
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4f0f822
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4f0f822
Branch: refs/heads/trunk
Commit: f4f0f8222276b630b61355b30827bf1b50160712
Parents: 34aa538
Author: Kim Christensen <ki...@mvno.dk>
Authored: Tue Dec 6 11:15:58 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Dec 6 11:26:12 2016 +0000
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 24 +++++++------
.../coordinator/GroupCoordinatorTest.scala | 36 ++++++++++++++++++++
2 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4f0f822/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index eb479d5..0c53345 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -776,22 +776,26 @@ object GroupCoordinator {
apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
}
+ private[coordinator] def offsetConfig(config: KafkaConfig) = OffsetConfig(
+ maxMetadataSize = config.offsetMetadataMaxSize,
+ loadBufferSize = config.offsetsLoadBufferSize,
+ offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
+ offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+ offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
+ offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+ offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
+ offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+ offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+ )
+
def apply(config: KafkaConfig,
zkUtils: ZkUtils,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
time: Time): GroupCoordinator = {
- val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4f0f822/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
new file mode 100644
index 0000000..8221d1e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorTest.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.Test
+import org.junit.Assert._
+
+class GroupCoordinatorTest {
+
+ @Test
+ def testOffsetsRetentionMsIntegerOverflow() {
+ val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+ props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)
+ val config = KafkaConfig.fromProps(props)
+ val offsetConfig = GroupCoordinator.offsetConfig(config)
+ assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L)
+ }
+
+}