You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/01/15 17:04:38 UTC
kafka git commit: KAFKA-1499;
Trivial follow-up (fix comments and whitespace)
Repository: kafka
Updated Branches:
refs/heads/trunk 1c8f89bc7 -> 45697ed6c
KAFKA-1499; Trivial follow-up (fix comments and whitespace)
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45697ed6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45697ed6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45697ed6
Branch: refs/heads/trunk
Commit: 45697ed6c6132475b5dd4dd0069a2a192a75d329
Parents: 1c8f89b
Author: Joel Koshy <jj...@gmail.com>
Authored: Thu Jan 15 08:04:29 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Jan 15 08:04:29 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 6 +++---
core/src/main/scala/kafka/log/LogConfig.scala | 7 +++----
core/src/main/scala/kafka/message/CompressionCodec.scala | 6 ++++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 9 ++++-----
core/src/test/scala/kafka/log/LogConfigTest.scala | 2 +-
.../scala/unit/kafka/message/ByteBufferMessageSetTest.scala | 4 ++--
6 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 86422bf..846023b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -41,8 +41,8 @@ object LogAppendInfo {
* @param lastOffset The last offset in the message set
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
- * @param sourceCodec The source codec used in the message set(coming from producer)
- * @param targetCodec The target codec of the message set(after applying broker compression logic)
+ * @param sourceCodec The source codec used in the message set (send by the producer)
+ * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
@@ -395,7 +395,7 @@ class Log(val dir: File,
sourceCodec = messageCodec
}
- //Apply if any broker-side compression
+ // Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 2338b44..4631bc7 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -152,10 +152,9 @@ object LogConfig {
val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" +
" -1 (or all) required acks"
- val CompressionTypeDoc = "This parameter allows you to specify the compression logic for a given topic. This config" +
- " is used to retain/remove/change the compression set by the producer. This config takes the following options: " +
- " uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker" +
- " writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
+ val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
+ "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
+ "no compression; and 'producer' which means retain the original compression codec set by the producer."
private val configDef = {
import ConfigDef.Range._
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index cacde9b..c4aa8ce 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -53,8 +53,10 @@ object BrokerCompressionCodec {
}
def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
- if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression
- else getCompressionCodec(compressionType)
+ if (ProducerCompressionCodec.name.equals(compressionType))
+ producerCompression
+ else
+ getCompressionCodec(compressionType)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9d1adec..d3d8ac4 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -348,11 +348,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
/**
- * This parameter allows you to specify the broker-side compression logic. This config is used to
- * retain/remove/change the compression set by the producer. This config takes the following options:
- * uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker
- * writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
- */
+ * Specify the final compression type for a given topic. This configuration accepts the standard compression codecs
+ * ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and
+ * 'producer' which means retain the original compression codec set by the producer."
+ */
val compressionType = props.getString("compression.type", "producer").toLowerCase()
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." +
" Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
index fe5bd9d..9690f14 100644
--- a/core/src/test/scala/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/kafka/log/LogConfigTest.scala
@@ -51,7 +51,7 @@ class LogConfigTest extends JUnit3Suite {
LogConfig.configNames().foreach((name) => {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
- case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer","uncompressed","gzip"))
+ case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip"))
case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 716254a..73a2637 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
// check uncompressed offsets
checkOffsets(messages, 0)
var offset = 1234567
- checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec , NoCompressionCodec), offset)
+ checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
// check compressed messages
checkOffsets(compressedMessages, 0)
- checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec , DefaultCompressionCodec), offset)
+ checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
}
/* check that offsets are assigned based on byte offset from the given base offset */