You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/17 03:48:40 UTC
kafka git commit: kafka-1864;
Revisit defaults for the internal offsets topic; patched by Jun Rao;
reviewed by Jeol Koshy, Neha Narkhede, and Gwen Shapira
Repository: kafka
Updated Branches:
refs/heads/0.8.2 1d3fd0f6c -> 7a313999d
kafka-1864; Revisit defaults for the internal offsets topic; patched by Jun Rao; reviewed by Jeol Koshy, Neha Narkhede, and Gwen Shapira
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a313999
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a313999
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a313999
Branch: refs/heads/0.8.2
Commit: 7a313999d0d2c846541299ded77b25eddf1bf554
Parents: 1d3fd0f
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 16 18:48:33 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 16 18:48:33 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 11 +++++++++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++++-
core/src/main/scala/kafka/server/OffsetManager.scala | 4 ++--
.../kafka/api/ProducerFailureHandlingTest.scala | 7 +++++++
.../test/scala/unit/kafka/server/OffsetCommitTest.scala | 7 +++++++
5 files changed, 30 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a313999/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d626b17..7def852 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -498,10 +498,17 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
try {
if (topic == OffsetManager.OffsetsTopicName) {
- AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+ val aliveBrokers = metadataCache.getAliveBrokers
+ val offsetsTopicReplicationFactor =
+ if (aliveBrokers.length > 0)
+ Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
+ else
+ config.offsetsTopicReplicationFactor
+ AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
+ offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
- .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
+ .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
}
else {
AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a313999/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e26c54..e3396ad 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -304,7 +304,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size",
OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE))
- /** The replication factor for the offset commit topic (set higher to ensure availability). */
+ /** The replication factor for the offsets topic (set higher to ensure availability). To
+ * ensure that the effective replication factor of the offsets topic is the configured value,
+ * the number of alive brokers has to be at least the replication factor at the time of the
+ * first request for the offsets topic. If not, either the offsets topic creation will fail or
+ * it will get a replication factor of min(alive brokers, configured replication factor) */
val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor",
OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue))
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a313999/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 43eb2a3..d3e8868 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -74,9 +74,9 @@ object OffsetManagerConfig {
val DefaultMaxMetadataSize = 4096
val DefaultLoadBufferSize = 5*1024*1024
val DefaultOffsetsRetentionCheckIntervalMs = 600000L
- val DefaultOffsetsTopicNumPartitions = 1
+ val DefaultOffsetsTopicNumPartitions = 50
val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
- val DefaultOffsetsTopicReplicationFactor = 1.toShort
+ val DefaultOffsetsTopicReplicationFactor = 3.toShort
val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
val DefaultOffsetCommitTimeoutMs = 5000
val DefaultOffsetCommitRequiredAcks = (-1).toShort
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a313999/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 07a7bee..a75197c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -46,6 +46,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
override val zkConnect = TestZKUtils.zookeeperConnect
override val autoCreateTopicsEnable = false
override val messageMaxBytes = serverMessageMaxBytes
+ // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
+ // the broker. As a result, the live broker list in metadataCache is empty. If the number of live brokers is 0, we
+ // try to create the offset topic with the default offsets.topic.replication.factor of 3. The creation will fail
+ // since there is not enough live brokers. This causes testCannotSendToInternalTopic() to fail. Temporarily fixing
+ // the issue by overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we need to
+ // remove the following config override.
+ override val offsetsTopicReplicationFactor = 1.asInstanceOf[Short]
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a313999/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 4a3a5b2..5b93239 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -46,6 +46,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val config: Properties = createBrokerConfig(1, brokerPort)
+ // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
+ // the broker. As a result, the live broker list in metadataCache is empty. This causes the ConsumerMetadataRequest
+ // to fail since if the number of live brokers is 0, we try to create the offset topic with the default
+ // offsets.topic.replication.factor of 3. The creation will fail since there is not enough live brokers. In order
+ // for the unit test to pass, overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we
+ // need to remove the following config override.
+ config.put("offsets.topic.replication.factor", "1")
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()