You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC
[10/11] git commit: Use uniform convention for naming properties keys;
kafka-648; patched by Sriram Subramanian; reviewed by Jun Rao
Use uniform convention for naming properties keys; kafka-648; patched by Sriram Subramanian; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4095319
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4095319
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4095319
Branch: refs/heads/trunk
Commit: a40953196e1ef558eb61b78219a20c20a4bd63df
Parents: dbe87f6
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 11 16:12:57 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 11 16:12:57 2013 -0800
----------------------------------------------------------------------
config/consumer.properties | 2 +-
config/producer.properties | 25 +----
config/server.properties | 34 +++----
.../main/java/kafka/etl/impl/DataGenerator.java | 2 +-
.../kafka/bridge/hadoop/KafkaOutputFormat.java | 3 +-
core/src/main/scala/kafka/client/ClientUtils.scala | 2 +-
core/src/main/scala/kafka/cluster/Partition.scala | 10 +-
.../scala/kafka/consumer/ConsoleConsumer.scala | 16 ++--
.../main/scala/kafka/consumer/ConsumerConfig.scala | 32 +++---
.../kafka/consumer/ConsumerFetcherThread.scala | 8 +-
.../consumer/ZookeeperConsumerConnector.scala | 20 ++--
core/src/main/scala/kafka/log/LogManager.scala | 34 ++++----
.../scala/kafka/producer/ConsoleProducer.scala | 10 +-
.../scala/kafka/producer/KafkaLog4jAppender.scala | 4 +-
core/src/main/scala/kafka/producer/Producer.scala | 12 +-
.../main/scala/kafka/producer/ProducerConfig.scala | 25 +++---
.../main/scala/kafka/producer/SyncProducer.scala | 2 +-
.../scala/kafka/producer/SyncProducerConfig.scala | 10 +-
.../kafka/producer/async/AsyncProducerConfig.scala | 8 +-
.../kafka/producer/async/DefaultEventHandler.scala | 9 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 6 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 72 ++++++++-------
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 6 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 4 +-
.../main/scala/kafka/tools/ReplayLogProducer.scala | 14 ++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +-
.../scala/other/kafka/TestEndToEndLatency.scala | 4 +-
.../scala/other/kafka/TestLogPerformance.scala | 2 +-
.../scala/other/kafka/TestZKConsumerOffsets.scala | 2 +-
.../kafka/integration/AutoOffsetResetTest.scala | 4 +-
.../integration/ProducerConsumerTestHarness.scala | 8 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 22 ++--
.../test/scala/unit/kafka/log/LogOffsetTest.scala | 6 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 36 ++++----
.../unit/kafka/producer/AsyncProducerTest.scala | 8 +-
.../scala/unit/kafka/producer/ProducerTest.scala | 16 ++--
.../unit/kafka/producer/SyncProducerTest.scala | 17 ++--
.../unit/kafka/server/ISRExpirationTest.scala | 10 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 8 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 24 +++---
.../src/main/java/kafka/examples/Consumer.java | 8 +-
.../scala/kafka/perf/ConsumerPerformance.scala | 12 +-
.../scala/kafka/perf/ProducerPerformance.scala | 16 ++--
.../config/mirror_producer.properties | 4 +-
.../config/mirror_producer1.properties | 4 +-
.../config/mirror_producer2.properties | 4 +-
.../config/mirror_producer3.properties | 4 +-
.../config/server_source1.properties | 20 ++--
.../config/server_source2.properties | 20 ++--
.../config/server_source3.properties | 20 ++--
.../config/server_source4.properties | 20 ++--
.../config/server_target1.properties | 20 ++--
.../config/server_target2.properties | 20 ++--
.../config/server_target3.properties | 20 ++--
.../config/whitelisttest.consumer.properties | 6 +-
system_test/common/util.sh | 8 +-
.../config/migration_producer.properties | 29 +-----
.../config/server.properties | 36 ++++----
.../config/blacklisttest.consumer.properties | 6 +-
.../mirror_maker/config/mirror_producer.properties | 4 +-
.../config/server_source_1_1.properties | 20 ++--
.../config/server_source_1_2.properties | 20 ++--
.../config/server_source_2_1.properties | 20 ++--
.../config/server_source_2_2.properties | 20 ++--
.../config/server_target_1_1.properties | 20 ++--
.../config/server_target_1_2.properties | 20 ++--
.../config/whitelisttest_1.consumer.properties | 6 +-
.../config/whitelisttest_2.consumer.properties | 6 +-
.../config/mirror_consumer.properties | 20 ++--
.../config/mirror_producer.properties | 2 +-
.../config/server.properties | 56 ++++++------
system_test/producer_perf/config/server.properties | 20 ++--
.../replication_testsuite/config/server.properties | 56 ++++++------
76 files changed, 541 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/consumer.properties
----------------------------------------------------------------------
diff --git a/config/consumer.properties b/config/consumer.properties
index a067ac0..1c43bf9 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181
zk.connectiontimeout.ms=1000000
#consumer group id
-groupid=test-consumer-group
+group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index eb36691..a1c8cb2 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -36,35 +36,18 @@ serializer.class=kafka.serializer.StringEncoder
# allow topic level compression
#compressed.topics=
-# max message size; messages larger than that size are discarded; default is 1000000
-#max.message.size=
-
-
############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
-#queue.time=
+#queue.buffering.max.ms=
# the maximum size of the blocking queue for buffering on the producer
-#queue.size=
+#queue.buffering.max.messages=
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueueTimeout.ms=
+#queue.enqueue.timeout.ms=
# the number of messages batched at the producer
-#batch.size=
-
-# the callback handler for one or multiple events
-#callback.handler=
-
-# properties required to initialize the callback handler
-#callback.handler.props=
-
-# the handler for events
-#event.handler=
-
-# properties required to initialize the event handler
-#event.handler.props=
-
+#batch.num.messages=
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index f4521fb..9a9cd06 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -17,7 +17,7 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
-brokerid=0
+broker.id=0
############################# Socket Server Settings #############################
@@ -27,22 +27,22 @@ port=9092
# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
-#hostname=localhost
+#host.name=localhost
# The number of threads handling network requests
-network.threads=2
+num.network.threads=2
# The number of threads doing disk I/O
-io.threads=2
+num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
+socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
+socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
+socket.request.max.bytes=104857600
############################# Log Basics #############################
@@ -54,9 +54,6 @@ log.dir=/tmp/kafka-logs
# for consumption, but also mean more files.
num.partitions=1
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
-
############################# Log Flush Policy #############################
# The following configurations control the flush of data to disk. This is the most
@@ -69,16 +66,13 @@ num.partitions=1
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
+log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
-
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
+log.flush.interval.ms=1000
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
############################# Log Retention Policy #############################
@@ -91,11 +85,11 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index 7f70f9e..df17978 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -71,7 +71,7 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
- producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
+ producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 9a1c359..2fd2035 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -119,10 +119,9 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
job.setInt("kafka.output.compression_codec", compressionCodec);
props.setProperty("producer.type", producerType);
- props.setProperty("buffer.size", Integer.toString(bufSize));
+ props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
props.setProperty("connect.timeout.ms", Integer.toString(timeout));
props.setProperty("reconnect.interval", Integer.toString(interval));
- props.setProperty("max.message.size", Integer.toString(maxSize));
props.setProperty("compression.codec", Integer.toString(compressionCodec));
if (uri.getScheme().equals("kafka")) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 968a91f..af5d231 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -61,7 +61,7 @@ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
val props = new Properties()
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
- props.put("clientid", clientId)
+ props.put("client.id", clientId)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ca542f..ea5b5a0 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -261,11 +261,11 @@ class Partition(val topic: String,
.format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
- def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
+ def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
- val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
+ val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
@@ -281,12 +281,12 @@ class Partition(val topic: String,
}
}
- def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
+ def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
/**
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
* for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
- * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
+ * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
* follower is not catching up and should be removed from the ISR
**/
val leaderLogEndOffset = leaderReplica.logEndOffset
@@ -298,7 +298,7 @@ class Partition(val topic: String,
val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
- val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes)
+ val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index b857d14..5dffa7e 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -144,14 +144,14 @@ object ConsoleConsumer extends Logging {
}
val props = new Properties()
- props.put("groupid", options.valueOf(groupIdOpt))
- props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
- props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
- props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
- props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
- props.put("autocommit.enable", "true")
- props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
- props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+ props.put("group.id", options.valueOf(groupIdOpt))
+ props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+ props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
+ props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
+ props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
+ props.put("auto.commit.enable", "true")
+ props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
+ props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
props.put("zk.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
val config = new ConsumerConfig(props)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index b379c9d..45db07b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -52,11 +52,11 @@ object ConsumerConfig extends Config {
}
def validateClientId(clientId: String) {
- validateChars("clientid", clientId)
+ validateChars("client.id", clientId)
}
def validateGroupId(groupId: String) {
- validateChars("groupid", groupId)
+ validateChars("group.id", groupId)
}
def validateAutoOffsetReset(autoOffsetReset: String) {
@@ -77,38 +77,38 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
}
/** a string that uniquely identifies a set of consumers within the same consumer group */
- val groupId = props.getString("groupid")
+ val groupId = props.getString("group.id")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
- val consumerId: Option[String] = Option(props.getString("consumerid", null))
+ val consumerId: Option[String] = Option(props.getString("consumer.id", null))
/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
- val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
+ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
- val fetchSize = props.getInt("fetch.size", FetchSize)
+ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
- val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
+ val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
- val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
+ val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
- val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
+ val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks)
/** max number of retries during rebalance */
- val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
+ val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
- val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
+ val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
- /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
- val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
+ /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
+ val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
@@ -120,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
- val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
+ val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
@@ -129,12 +129,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
- val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
+ val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
- val clientId = props.getString("clientid", groupId)
+ val clientId = props.getString("client.id", groupId)
validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 2ce024c..713c7c9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -33,11 +33,11 @@ class ConsumerFetcherThread(name: String,
clientId = config.clientId + "-" + name,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
- socketBufferSize = config.socketBufferSize,
- fetchSize = config.fetchSize,
+ socketBufferSize = config.socketReceiveBufferBytes,
+ fetchSize = config.fetchMessageMaxBytes,
fetcherBrokerId = Request.OrdinaryConsumerId,
- maxWait = config.maxFetchWaitMs,
- minBytes = config.minFetchBytes) {
+ maxWait = config.fetchWaitMaxMs,
+ minBytes = config.fetchMinBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index aee9293..42a9628 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
connectZk()
createFetcher()
- if (config.autoCommit) {
+ if (config.autoCommitEnable) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
@@ -160,14 +160,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
try {
- if (config.autoCommit)
+ if (config.autoCommitEnable)
scheduler.shutdownNow()
fetcher match {
case Some(f) => f.shutdown
case None =>
}
sendShutdownToAllQueues()
- if (config.autoCommit)
+ if (config.autoCommitEnable)
commitOffsets()
if (zkClient != null) {
zkClient.close()
@@ -194,9 +194,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
- val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
- queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
(queue, stream)
})
).flatten.toList
@@ -365,7 +365,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def syncedRebalance() {
rebalanceLock synchronized {
- for (i <- 0 until config.maxRebalanceRetries) {
+ for (i <- 0 until config.rebalanceMaxRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
val cluster = getCluster(zkClient)
@@ -393,7 +393,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
- throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
+ throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
}
private def rebalance(cluster: Cluster): Boolean = {
@@ -610,7 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
queue,
consumedOffset,
fetchedOffset,
- new AtomicInteger(config.fetchSize),
+ new AtomicInteger(config.fetchMessageMaxBytes),
config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
@@ -709,12 +709,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(e => {
- val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
- config.enableShallowIterator,
+ config.shallowIteratorEnable,
config.clientId)
(queue, stream)
}).toList
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5f0148c..497cfdd 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -43,15 +43,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
- private val logFileSizeMap = config.logFileSizeMap
- private val logFlushInterval = config.flushInterval
- private val logFlushIntervals = config.flushIntervalMap
+ private val logFileSizeMap = config.logSegmentBytesPerTopicMap
+ private val logFlushInterval = config.logFlushIntervalMessages
+ private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
private val logCreationLock = new Object
- private val logRetentionSizeMap = config.logRetentionSizeMap
- private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
- private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+ private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
+ private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+ private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
- private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+ private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@@ -111,14 +111,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
info("Loading log '" + dir.getName + "'")
val topicPartition = parseTopicPartitionName(dir.getName)
val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+ val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
val log = new Log(dir,
maxLogFileSize,
- config.maxMessageSize,
+ config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery,
- config.logIndexMaxSizeBytes,
+ config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
@@ -139,10 +139,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
if(scheduler != null) {
info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
- info("Starting log flusher every " + config.flushSchedulerThreadRate +
+ info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
" ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
- config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+ config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
}
}
@@ -186,14 +186,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+ val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
log = new Log(dir,
maxLogFileSize,
- config.maxMessageSize,
+ config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery = false,
- config.logIndexMaxSizeBytes,
+ config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
@@ -249,7 +249,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = parseTopicPartitionName(log.dir.getName).topic
- val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+ val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
@@ -310,7 +310,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
for (log <- allLogs) {
try {
val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
- var logFlushInterval = config.defaultFlushIntervalMs
+ var logFlushInterval = config.logFlushIntervalMs
if(logFlushIntervals.contains(log.topicName))
logFlushInterval = logFlushIntervals(log.topicName)
debug(log.topicName + " flush interval " + logFlushInterval +
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 4e2f2af..1a98174 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -125,12 +125,12 @@ object ConsoleProducer {
props.put("compression.codec", codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
- props.put("batch.size", batchSize.toString)
- props.put("queue.time", sendTimeout.toString)
- props.put("queue.size", queueSize.toString)
+ props.put("batch.num.messages", batchSize.toString)
+ props.put("queue.buffering.max.ms", sendTimeout.toString)
+ props.put("queue.buffering.max.messages", queueSize.toString)
props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
- props.put("producer.request.required.acks", requestRequiredAcks.toString)
- props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
+ props.put("request.required.acks", requestRequiredAcks.toString)
+ props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index a7c101a..af077e0 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -73,8 +73,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
//These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
if(producerType != null) props.put("producer.type", producerType)
if(compressionCodec != null) props.put("compression.codec", compressionCodec)
- if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
- if(queueSize != null) props.put("queue.size", queueSize)
+ if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
+ if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + config.brokerList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index a183525..66638f2 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -31,7 +31,7 @@ class Producer[K,V](config: ProducerConfig,
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
- private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
+ private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
private val random = new Random
private var sync: Boolean = true
@@ -44,8 +44,8 @@ class Producer[K,V](config: ProducerConfig,
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID,
queue,
eventHandler,
- config.queueTime,
- config.batchSize,
+ config.queueBufferingMaxMs,
+ config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
@@ -87,17 +87,17 @@ class Producer[K,V](config: ProducerConfig,
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
- val added = config.enqueueTimeoutMs match {
+ val added = config.queueEnqueueTimeoutMs match {
case 0 =>
queue.offer(message)
case _ =>
try {
- config.enqueueTimeoutMs < 0 match {
+ config.queueEnqueueTimeoutMs < 0 match {
case true =>
queue.put(message)
true
case _ =>
- queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 235b228..e27ec44 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -26,12 +26,12 @@ import kafka.common.{InvalidConfigException, Config}
object ProducerConfig extends Config {
def validate(config: ProducerConfig) {
validateClientId(config.clientId)
- validateBatchSize(config.batchSize, config.queueSize)
+ validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages)
validateProducerType(config.producerType)
}
def validateClientId(clientId: String) {
- validateChars("clientid", clientId)
+ validateChars("client.id", clientId)
}
def validateBatchSize(batchSize: Int, queueSize: Int) {
@@ -101,17 +101,16 @@ class ProducerConfig private (val props: VerifiableProperties)
*/
val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
- /**
- * The producer using the zookeeper software load balancer maintains a ZK cache that gets
- * updated by the zookeeper watcher listeners. During some events like a broker bounce, the
- * producer ZK cache can get into an inconsistent state, for a small time period. In this time
- * period, it could end up picking a broker partition that is unavailable. When this happens, the
- * ZK cache needs to be updated.
- * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
- */
- val producerRetries = props.getInt("producer.num.retries", 3)
+ /** The leader may be unavailable transiently, which can fail the sending of a message.
+ * This property specifies the number of retries when such failures occur.
+ */
+ val messageSendMaxRetries = props.getInt("message.send.max.retries", 3)
- val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+ /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader
+ * election takes a bit of time, this property specifies the amount of time that the producer
+ * waits before refreshing the metadata.
+ */
+ val retryBackoffMs = props.getInt("retry.backoff.ms", 100)
/**
* The producer generally refreshes the topic metadata from brokers when there is a failure
@@ -121,7 +120,7 @@ class ProducerConfig private (val props: VerifiableProperties)
* Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
* a message the metadata is never refreshed
*/
- val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000)
+ val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 0ef320b..0469a39 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -36,7 +36,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
- config.bufferSize, config.requestTimeoutMs)
+ config.sendBufferBytes, config.requestTimeoutMs)
val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 5ebd29a..ef32620 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -36,24 +36,22 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
trait SyncProducerConfigShared {
val props: VerifiableProperties
- val bufferSize = props.getInt("buffer.size", 100*1024)
-
- val maxMessageSize = props.getInt("max.message.size", 1000000)
+ val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024)
/* the client application sending the producer requests */
- val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
+ val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
/*
* The required acks of the producer requests - negative value means ack
* after the replicas in ISR have caught up to the leader's offset
* corresponding to this produce request.
*/
- val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+ val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
*/
- val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
+ val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
(1, Integer.MAX_VALUE))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
index 07935d7..973fa08 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
@@ -22,10 +22,10 @@ trait AsyncProducerConfig {
val props: VerifiableProperties
/* maximum time, in milliseconds, for buffering data on the producer queue */
- val queueTime = props.getInt("queue.time", 5000)
+ val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000)
/** the maximum size of the blocking queue for buffering on the producer */
- val queueSize = props.getInt("queue.size", 10000)
+ val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000)
/**
* Timeout for event enqueue:
@@ -33,10 +33,10 @@ trait AsyncProducerConfig {
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
- val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
+ val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0)
/** the number of messages batched at the producer */
- val batchSize = props.getInt("batch.size", 200)
+ val batchNumMessages = props.getInt("batch.num.messages", 200)
/** the serializer class for values */
val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 58f582f..9a4e4bc 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -59,7 +59,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
- var remainingRetries = config.producerRetries + 1
+ var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
@@ -72,7 +72,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.size > 0) {
// back off and update the topic metadata cache before attempting another send operation
- Thread.sleep(config.producerRetryBackoffMs)
+ Thread.sleep(config.retryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
remainingRetries -= 1
@@ -81,9 +81,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
+
val correlationIdEnd = correlationId.get()
error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
- throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
+ throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
}
}
@@ -231,7 +232,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
- val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks,
+ val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e2dfb3e..60752fb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -41,9 +41,9 @@ class KafkaApis(val requestChannel: RequestChannel,
brokerId: Int) extends Logging {
private val producerRequestPurgatory =
- new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+ new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
private val fetchRequestPurgatory =
- new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
+ new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
private val delayedRequestMetrics = new DelayedRequestMetrics
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -442,7 +442,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ErrorMapping.UnknownTopicOrPartitionCode =>
try {
/* check if auto creation of topics is turned on */
- if (config.autoCreateTopics) {
+ if (config.autoCreateTopicsEnable) {
try {
CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 962b65f..f65db33 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -36,37 +36,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/*********** General Configuration ***********/
/* the broker id for this server */
- val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
+ val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* the maximum size of message that the server can receive */
- val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
+ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
- val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+ val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
/* the number of io threads that the server uses for carrying out network requests */
- val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+ val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
/* the number of queued requests allowed before blocking the network threads */
- val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+ val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
/*********** Socket Server Configuration ***********/
/* the port to listen and accept connections on */
val port: Int = props.getInt("port", 6667)
- /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
+ /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
* it will bind to all interfaces, and publish one to ZK */
- val hostName: String = props.getString("hostname", null)
+ val hostName: String = props.getString("host.name", null)
/* the SO_SNDBUFF buffer of the socket sever sockets */
- val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
+ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
/* the SO_RCVBUFF buffer of the socket sever sockets */
- val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
+ val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024)
/* the maximum number of bytes in a socket request */
- val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
/*********** Log Configuration ***********/
@@ -74,56 +74,56 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directories in which the log data is kept */
- val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
+ val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs")))
require(logDirs.size > 0)
/* the maximum size of a single log file */
- val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+ val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
- val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
+ val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the maximum time before a new log segment is rolled out */
val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
- val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)
+ val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
- val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
+ val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the maximum size of the log before deleting it */
- val logRetentionSize = props.getLong("log.retention.size", -1)
+ val logRetentionBytes = props.getLong("log.retention.bytes", -1)
/* the maximum size of the log for some specific topic before deleting it */
- val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
+ val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong)
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
- val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+ val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
/* the maximum size in bytes of the offset index */
- val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue))
+ val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
/* the interval with which we add an entry to the offset index */
val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
- val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
+ val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
- val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+ val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
- val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
+ val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
- val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
+ val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
/* enable auto creation of topic on the server */
- val autoCreateTopics = props.getBoolean("auto.create.topics", true)
+ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
/*********** Replication configuration ***********/
@@ -136,36 +136,38 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* default replication factors for automatically created topics */
val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
- val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
+ /* If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr */
+ val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000)
- val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
+ /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
+ val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
/* the socket timeout for network requests */
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
/* the socket receive buffer for network requests */
- val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+ val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)
/* the number of byes of messages to attempt to fetch */
- val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
+ val replicaFetchMaxBytes = props.getInt("replica.fetch.max.bytes", ConsumerConfig.FetchSize)
/* max wait time for each fetcher request issued by follower replicas*/
- val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
+ val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500)
/* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
- val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 1)
+ val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1)
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
- val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+ val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
- /* the frequency with which the highwater mark is saved out to disk */
- val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
+ /* the frequency with which the high watermark is saved out to disk */
+ val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
/* the purge interval (in number of requests) of the fetch request purgatory */
- val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+ val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
/* the purge interval (in number of requests) of the producer request purgatory */
- val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ae35e4f..1fe1ca9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -65,8 +65,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.hostName,
config.port,
config.numNetworkThreads,
- config.numQueuedRequests,
- config.maxSocketRequestSize)
+ config.queuedMaxRequests,
+ config.socketRequestMaxBytes)
socketServer.startup
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c1d3235..6ae601e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -31,11 +31,11 @@ class ReplicaFetcherThread(name:String,
clientId = FetchRequest.ReplicaFetcherClientId,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
- socketBufferSize = brokerConfig.replicaSocketBufferSize,
- fetchSize = brokerConfig.replicaFetchSize,
+ socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
+ fetchSize = brokerConfig.replicaFetchMaxBytes,
fetcherBrokerId = brokerConfig.brokerId,
- maxWait = brokerConfig.replicaMaxWaitTimeMs,
- minBytes = brokerConfig.replicaMinBytes) {
+ maxWait = brokerConfig.replicaFetchWaitMaxMs,
+ minBytes = brokerConfig.replicaFetchMinBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42068ca..064af6b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig,
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
- kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
+ kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
}
/**
@@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
- kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+ kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
@@ -244,7 +244,7 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
leaderPartitionsLock synchronized {
- leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
+ leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 36a119b..1f5c7ba 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -182,9 +182,9 @@ public class KafkaMigrationTool
Properties kafkaConsumerProperties_07 = new Properties();
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
- if(kafkaConsumerProperties_07.getProperty("shallowiterator.enable", "").equals("true")){
+ if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){
logger.warn("Shallow iterator should not be used in the migration tool");
- kafkaConsumerProperties_07.setProperty("shallowiterator.enable", "false");
+ kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
}
Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index db14c82..d744a78 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -42,12 +42,12 @@ object ReplayLogProducer extends Logging {
// consumer properties
val consumerProps = new Properties
- consumerProps.put("groupid", GroupId)
+ consumerProps.put("group.id", GroupId)
consumerProps.put("zk.connect", config.zkConnect)
consumerProps.put("consumer.timeout.ms", "10000")
- consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
- consumerProps.put("fetch.size", (1024*1024).toString)
- consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
+ consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
+ consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
+ consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
val consumerConfig = new ConsumerConfig(consumerProps)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
@@ -141,10 +141,10 @@ object ReplayLogProducer extends Logging {
val props = new Properties()
props.put("broker.list", config.brokerList)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
- props.put("buffer.size", (64*1024).toString)
+ props.put("send.buffer.bytes", (64*1024).toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
- props.put("batch.size", config.batchSize.toString)
- props.put("queue.enqueueTimeout.ms", "-1")
+ props.put("batch.num.messages", config.batchSize.toString)
+ props.put("queue.enqueue.timeout.ms", "-1")
if(config.isAsync)
props.put("producer.type", "async")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 5ba5938..f594404 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -785,11 +785,11 @@ class ZKConfig(props: VerifiableProperties) {
val zkConnect = props.getString("zk.connect", null)
/** zookeeper session timeout */
- val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
+ val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
- val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
+ val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
- val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
+ val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
index 5be4f4e..98c12b7 100644
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
@@ -35,9 +35,9 @@ object TestEndToEndLatency {
val topic = "test"
val consumerProps = new Properties()
- consumerProps.put("groupid", topic)
+ consumerProps.put("group.id", topic)
consumerProps.put("auto.commit", "true")
- consumerProps.put("autooffset.reset", "largest")
+ consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zk.connect", zkConnect)
consumerProps.put("socket.timeout.ms", 1201000.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index 75c33e0..9f3bb40 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -33,7 +33,7 @@ object TestLogPerformance {
val props = TestUtils.createBrokerConfig(0, -1)
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
+ val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 5b72eed..31534ca 100644
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -31,7 +31,7 @@ object TestZKConsumerOffsets {
val topic = args(1)
val autoOffsetReset = args(2)
val props = Utils.loadProps(args(0))
- props.put("autooffset.reset", "largest")
+ props.put("auto.offset.reset", "largest")
val config = new ConsumerConfig(props)
val consumerConnector: ConsumerConnector = Consumer.create(config)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index d7945a5..4c646f0 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -78,9 +78,9 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
- consumerProps.put("autooffset.reset", resetTo)
+ consumerProps.put("auto.offset.reset", resetTo)
consumerProps.put("consumer.timeout.ms", "2000")
- consumerProps.put("max.fetch.wait.ms", "0")
+ consumerProps.put("fetch.wait.max.ms", "0")
val consumerConfig = new ConsumerConfig(consumerProps)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index caea858..0fde254 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -35,12 +35,12 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
val props = new Properties()
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
- props.put("buffer.size", "65536")
+ props.put("send.buffer.bytes", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
- props.put("producer.retry.backoff.ms", "1000")
- props.put("producer.num.retries", "3")
- props.put("producer.request.required.acks", "-1")
+ props.put("retry.backoff.ms", "1000")
+ props.put("message.send.max.retries", "3")
+ props.put("request.required.acks", "-1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b06d812..ce893bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -40,8 +40,8 @@ class LogManagerTest extends JUnit3Suite {
override def setUp() {
super.setUp()
config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
- override val logFileSize = 1024
- override val flushInterval = 10000
+ override val logSegmentBytes = 1024
+ override val logFlushIntervalMessages = 10000
override val logRetentionHours = maxLogAgeHours
}
scheduler.startup
@@ -114,10 +114,10 @@ class LogManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
- override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
+ override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
+ override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
- override val flushInterval = 100
+ override val logFlushIntervalMessages = 100
override val logRollHours = maxRollInterval
}
logManager = new LogManager(config, scheduler, time)
@@ -158,11 +158,11 @@ class LogManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val logFileSize = 1024 *1024 *1024
- override val flushSchedulerThreadRate = 50
- override val flushInterval = Int.MaxValue
+ override val logSegmentBytes = 1024 *1024 *1024
+ override val logFlushSchedulerIntervalMs = 50
+ override val logFlushIntervalMessages = Int.MaxValue
override val logRollHours = maxRollInterval
- override val flushIntervalMap = Map("timebasedflush" -> 100)
+ override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
}
logManager = new LogManager(config, scheduler, time)
logManager.startup
@@ -173,7 +173,7 @@ class LogManagerTest extends JUnit3Suite {
}
val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
- ellapsed < 2*config.flushSchedulerThreadRate)
+ ellapsed < 2*config.logFlushSchedulerIntervalMs)
}
@Test
@@ -183,7 +183,7 @@ class LogManagerTest extends JUnit3Suite {
val dirs = Seq(TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath)
- props.put("log.directories", dirs.mkString(","))
+ props.put("log.dirs", dirs.mkString(","))
logManager.shutdown()
logManager = new LogManager(new KafkaConfig(props), scheduler, time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index c6ea3b6..b343d98 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -198,15 +198,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
- props.put("brokerid", nodeId.toString)
+ props.put("broker.id", nodeId.toString)
props.put("port", port.toString)
props.put("log.dir", getLogDir.getAbsolutePath)
- props.put("log.flush.interval", "1")
+ props.put("log.flush.interval.messages", "1")
props.put("enable.zookeeper", "false")
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
props.put("log.cleanup.interval.mins", "5")
- props.put("log.file.size", logSize.toString)
+ props.put("log.segment.bytes", logSize.toString)
props.put("zk.connect", zkConnect.toString)
props
}