You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/01/15 17:04:38 UTC

kafka git commit: KAFKA-1499; Trivial follow-up (fix comments and whitespace)

Repository: kafka
Updated Branches:
  refs/heads/trunk 1c8f89bc7 -> 45697ed6c


KAFKA-1499; Trivial follow-up (fix comments and whitespace)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45697ed6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45697ed6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45697ed6

Branch: refs/heads/trunk
Commit: 45697ed6c6132475b5dd4dd0069a2a192a75d329
Parents: 1c8f89b
Author: Joel Koshy <jj...@gmail.com>
Authored: Thu Jan 15 08:04:29 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Jan 15 08:04:29 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala                     | 6 +++---
 core/src/main/scala/kafka/log/LogConfig.scala               | 7 +++----
 core/src/main/scala/kafka/message/CompressionCodec.scala    | 6 ++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 9 ++++-----
 core/src/test/scala/kafka/log/LogConfigTest.scala           | 2 +-
 .../scala/unit/kafka/message/ByteBufferMessageSetTest.scala | 4 ++--
 6 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 86422bf..846023b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -41,8 +41,8 @@ object LogAppendInfo {
  * @param lastOffset The last offset in the message set
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
- * @param sourceCodec The source codec used in the message set(coming from producer)
- * @param targetCodec The target codec of the message set(after applying broker compression logic)
+ * @param sourceCodec The source codec used in the message set (send by the producer)
+ * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
  */
 case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
@@ -395,7 +395,7 @@ class Log(val dir: File,
         sourceCodec = messageCodec
     }
 
-    //Apply if any broker-side compression
+    // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
     
     LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 2338b44..4631bc7 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -152,10 +152,9 @@ object LogConfig {
   val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
   val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" +
     " -1 (or all) required acks"
-  val CompressionTypeDoc = "This parameter allows you to specify the compression logic for a given topic. This config" +
-    " is used to retain/remove/change the compression set by the producer. This config takes the following options: " +
-    " uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker" +
-    " writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
+  val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
+    "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
+    "no compression; and 'producer' which means retain the original compression codec set by the producer."
 
   private val configDef = {
     import ConfigDef.Range._

http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index cacde9b..c4aa8ce 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -53,8 +53,10 @@ object BrokerCompressionCodec {
   }
 
   def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
-    if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression
-    else getCompressionCodec(compressionType)
+    if (ProducerCompressionCodec.name.equals(compressionType))
+      producerCompression
+    else
+      getCompressionCodec(compressionType)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9d1adec..d3d8ac4 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -348,11 +348,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
 
   /**
-   * This parameter allows you to specify the broker-side compression logic. This config is used to
-   * retain/remove/change the compression set by the producer. This config takes the following options:
-   * uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker
-   * writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
-  */
+   * Specify the final compression type for a given topic. This configuration accepts the standard compression codecs
+   * ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and
+   * 'producer' which means retain the original compression codec set by the producer."
+   */
   val compressionType = props.getString("compression.type", "producer").toLowerCase()
   require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." +
       " Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
index fe5bd9d..9690f14 100644
--- a/core/src/test/scala/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/kafka/log/LogConfigTest.scala
@@ -51,7 +51,7 @@ class LogConfigTest extends JUnit3Suite {
     LogConfig.configNames().foreach((name) => {
       name match {
         case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
-        case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer","uncompressed","gzip"))
+        case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip"))
         case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
         case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
         case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45697ed6/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 716254a..73a2637 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets 
     checkOffsets(messages, 0)
     var offset = 1234567
-    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec , NoCompressionCodec), offset)
+    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec , DefaultCompressionCodec), offset)
+    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
   }
   
   /* check that offsets are assigned based on byte offset from the given base offset */