You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/18 23:07:40 UTC

[2/2] kafka git commit: kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao

kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c904074
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c904074
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c904074

Branch: refs/heads/trunk
Commit: 5c9040745466945a04ea0315de583ccdab0614ac
Parents: ba86f0a
Author: Gwen Shapira <cs...@gmail.com>
Authored: Thu Jun 18 14:07:33 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jun 18 14:07:33 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     |  12 +-
 .../main/scala/kafka/cluster/Partition.scala    |   2 +-
 .../kafka/controller/KafkaController.scala      |   4 +-
 .../controller/PartitionLeaderSelector.scala    |   2 +-
 core/src/main/scala/kafka/log/LogConfig.scala   | 156 ++---
 core/src/main/scala/kafka/log/LogManager.scala  |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   4 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 573 +++++--------------
 .../main/scala/kafka/server/KafkaServer.scala   |  55 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/TopicConfigManager.scala |   5 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  26 -
 .../test/scala/other/kafka/StressTestLog.scala  |  10 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   7 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   7 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  55 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   8 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  19 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  17 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 121 +++-
 .../kafka/server/DynamicConfigChangeTest.scala  |  17 +-
 .../kafka/server/KafkaConfigConfigDefTest.scala |  20 +-
 22 files changed, 444 insertions(+), 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index c4fa058..bae528d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -57,15 +57,19 @@ public class AbstractConfig {
         return values.get(key);
     }
 
-    public int getInt(String key) {
+    public Short getShort(String key) {
+        return (Short) get(key);
+    }
+
+    public Integer getInt(String key) {
         return (Integer) get(key);
     }
 
-    public long getLong(String key) {
+    public Long getLong(String key) {
         return (Long) get(key);
     }
 
-    public double getDouble(String key) {
+    public Double getDouble(String key) {
         return (Double) get(key);
     }
 
@@ -92,7 +96,7 @@ public class AbstractConfig {
         return keys;
     }
 
-    public Map<String, ?> originals() {
+    public Map<String, Object> originals() {
         Map<String, Object> copy = new HashMap<String, Object>();
         copy.putAll(originals);
         return copy;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 730a232..6cb6477 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -86,7 +86,7 @@ class Partition(val topic: String,
       case Some(replica) => replica
       case None =>
         if (isReplicaLocal(replicaId)) {
-          val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
+          val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
           val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 69bba24..3635057 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -325,7 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
         info("starting the partition rebalance scheduler")
         autoRebalanceScheduler.startup()
         autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
-          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
+          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
       }
       deleteTopicManager.start()
     }
@@ -1013,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
             // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
             // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
             // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
+            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
               topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
               newIsr = leaderAndIsr.isr

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3b15ab4..bb6b5c8 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
           case true =>
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
             // for unclean leader election.
-            if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
+            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
               topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index f64fd79..e9af221 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,92 +18,52 @@
 package kafka.log
 
 import java.util.Properties
+import kafka.server.KafkaConfig
 import org.apache.kafka.common.utils.Utils
 import scala.collection._
-import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import kafka.message.BrokerCompressionCodec
 import kafka.message.Message
 
 object Defaults {
-  val SegmentSize = 1024 * 1024
-  val SegmentMs = Long.MaxValue
-  val SegmentJitterMs = 0L
-  val FlushInterval = Long.MaxValue
-  val FlushMs = Long.MaxValue
-  val RetentionSize = Long.MaxValue
-  val RetentionMs = Long.MaxValue
-  val MaxMessageSize = Int.MaxValue
-  val MaxIndexSize = 1024 * 1024
-  val IndexInterval = 4096
-  val FileDeleteDelayMs = 60 * 1000L
-  val DeleteRetentionMs = 24 * 60 * 60 * 1000L
-  val MinCleanableDirtyRatio = 0.5
-  val Compact = false
-  val UncleanLeaderElectionEnable = true
-  val MinInSyncReplicas = 1
-  val CompressionType = "producer"
+  val SegmentSize = kafka.server.Defaults.LogSegmentBytes
+  val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L
+  val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L
+  val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages
+  val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs
+  val RetentionSize = kafka.server.Defaults.LogRetentionBytes
+  val RetentionMs = kafka.server.Defaults.LogRetentionHours * 60 * 60 * 1000L
+  val MaxMessageSize = kafka.server.Defaults.MessageMaxBytes
+  val MaxIndexSize = kafka.server.Defaults.LogIndexSizeMaxBytes
+  val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes
+  val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
+  val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
+  val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
+  val Compact = kafka.server.Defaults.LogCleanupPolicy
+  val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
+  val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
+  val CompressionType = kafka.server.Defaults.CompressionType
 }
 
-/**
- * Configuration settings for a log
- * @param segmentSize The hard maximum for the size of a segment file in the log
- * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
- * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling
- * @param flushInterval The number of messages that can be written to the log before a flush is forced
- * @param flushMs The amount of time the log can have dirty data before a flush is forced
- * @param retentionSize The approximate total number of bytes this log can use
- * @param retentionMs The approximate maximum age of the last segment that is retained
- * @param maxIndexSize The maximum size of an index file
- * @param indexInterval The approximate number of bytes between index entries
- * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
- * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
- * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
- * @param compact Should old segments in this log be deleted or deduplicated?
- * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
- * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
- * @param compressionType compressionType for a given topic
- *
- */
-case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
-                     segmentMs: Long = Defaults.SegmentMs,
-                     segmentJitterMs: Long = Defaults.SegmentJitterMs,
-                     flushInterval: Long = Defaults.FlushInterval,
-                     flushMs: Long = Defaults.FlushMs,
-                     retentionSize: Long = Defaults.RetentionSize,
-                     retentionMs: Long = Defaults.RetentionMs,
-                     maxMessageSize: Int = Defaults.MaxMessageSize,
-                     maxIndexSize: Int = Defaults.MaxIndexSize,
-                     indexInterval: Int = Defaults.IndexInterval,
-                     fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
-                     deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
-                     minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
-                     compact: Boolean = Defaults.Compact,
-                     uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
-                     minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
-                     compressionType: String = Defaults.CompressionType) {
-
-  def toProps: Properties = {
-    val props = new Properties()
-    import LogConfig._
-    props.put(SegmentBytesProp, segmentSize.toString)
-    props.put(SegmentMsProp, segmentMs.toString)
-    props.put(SegmentJitterMsProp, segmentJitterMs.toString)
-    props.put(SegmentIndexBytesProp, maxIndexSize.toString)
-    props.put(FlushMessagesProp, flushInterval.toString)
-    props.put(FlushMsProp, flushMs.toString)
-    props.put(RetentionBytesProp, retentionSize.toString)
-    props.put(RetentionMsProp, retentionMs.toString)
-    props.put(MaxMessageBytesProp, maxMessageSize.toString)
-    props.put(IndexIntervalBytesProp, indexInterval.toString)
-    props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
-    props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
-    props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
-    props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
-    props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
-    props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
-    props.put(CompressionTypeProp, compressionType)
-    props
-  }
+case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
+
+  val segmentSize = getInt(LogConfig.SegmentBytesProp)
+  val segmentMs = getLong(LogConfig.SegmentMsProp)
+  val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
+  val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp)
+  val flushInterval = getLong(LogConfig.FlushMessagesProp)
+  val flushMs = getLong(LogConfig.FlushMsProp)
+  val retentionSize = getLong(LogConfig.RetentionBytesProp)
+  val retentionMs = getLong(LogConfig.RetentionMsProp)
+  val maxMessageSize = getInt(LogConfig.MaxMessageBytesProp)
+  val indexInterval = getInt(LogConfig.IndexIntervalBytesProp)
+  val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
+  val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
+  val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
+  val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete
+  val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
+  val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
+  val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -111,6 +71,10 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
 
 object LogConfig {
 
+  def main(args: Array[String]) {
+    System.out.println(configDef.toHtmlTable)
+  }
+
   val Delete = "delete"
   val Compact = "compact"
 
@@ -179,7 +143,7 @@ object LogConfig {
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
         MinCleanableRatioDoc)
-      .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM,
+      .define(CleanupPolicyProp, STRING, Defaults.Compact, in(Compact, Delete), MEDIUM,
         CompactDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
         MEDIUM, UncleanLeaderElectionEnableDoc)
@@ -187,6 +151,8 @@ object LogConfig {
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
   }
 
+  def apply(): LogConfig = LogConfig(new Properties())
+
   def configNames() = {
     import JavaConversions._
     configDef.names().toList.sorted
@@ -194,37 +160,13 @@ object LogConfig {
 
 
   /**
-   * Parse the given properties instance into a LogConfig object
-   */
-  def fromProps(props: Properties): LogConfig = {
-    import kafka.utils.CoreUtils.evaluateDefaults
-    val parsed = configDef.parse(evaluateDefaults(props))
-    new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
-                  segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
-                  segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
-                  maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
-                  flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
-                  flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
-                  retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
-                  retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
-                  maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
-                  indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
-                  fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
-                  deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
-                  minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
-                  compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
-                  uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
-                  minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
-                  compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase())
-  }
-
-  /**
    * Create a log config instance using the given properties and defaults
    */
-  def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
-    val props = new Properties(defaults)
+  def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
+    val props = new Properties()
+    props.putAll(defaults)
     props.putAll(overrides)
-    fromProps(props)
+    LogConfig(props)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index e781eba..538fc83 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -356,7 +356,7 @@ class LogManager(val logDirs: Array[File],
            .format(topicAndPartition.topic, 
                    topicAndPartition.partition, 
                    dataDir.getAbsolutePath,
-                   {import JavaConversions._; config.toProps.mkString(", ")}))
+                   {import JavaConversions._; config.originals.mkString(", ")}))
       log
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c7debe4..ad6f058 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -428,9 +428,9 @@ class KafkaApis(val requestChannel: RequestChannel,
               val aliveBrokers = metadataCache.getAliveBrokers
               val offsetsTopicReplicationFactor =
                 if (aliveBrokers.length > 0)
-                  Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
+                  Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
                 else
-                  config.offsetsTopicReplicationFactor
+                  config.offsetsTopicReplicationFactor.toInt
               AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
                                      offsetsTopicReplicationFactor,
                                      offsetManager.offsetsTopicConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2d75186..e0b2480 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,7 +26,7 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.{mutable, immutable, JavaConversions, Map}
@@ -141,6 +141,10 @@ object Defaults {
 
 object KafkaConfig {
 
+  def main(args: Array[String]) {
+    System.out.println(configDef.toHtmlTable)
+  }
+
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectProp = "zookeeper.connect"
   val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
@@ -482,14 +486,14 @@ object KafkaConfig {
       .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
       .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
       .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
-      .define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
+      .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
       .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
       .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
 
       /** ********* Controlled shutdown configuration ***********/
       .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
-      .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
+      .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
       .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
 
       /** ********* Consumer coordinator configuration ***********/
@@ -520,139 +524,6 @@ object KafkaConfig {
   }
 
   /**
-   * Parse the given properties instance into a KafkaConfig object
-   */
-  def fromProps(props: Properties): KafkaConfig = {
-    import kafka.utils.CoreUtils.evaluateDefaults
-    val parsed = configDef.parse(evaluateDefaults(props))
-    new KafkaConfig(
-      /** ********* Zookeeper Configuration ***********/
-      zkConnect = parsed.get(ZkConnectProp).asInstanceOf[String],
-      zkSessionTimeoutMs = parsed.get(ZkSessionTimeoutMsProp).asInstanceOf[Int],
-      _zkConnectionTimeoutMs = Option(parsed.get(ZkConnectionTimeoutMsProp)).map(_.asInstanceOf[Int]),
-      zkSyncTimeMs = parsed.get(ZkSyncTimeMsProp).asInstanceOf[Int],
-
-      /** ********* General Configuration ***********/
-      maxReservedBrokerId = parsed.get(MaxReservedBrokerIdProp).asInstanceOf[Int],
-      brokerId = parsed.get(BrokerIdProp).asInstanceOf[Int],
-      messageMaxBytes = parsed.get(MessageMaxBytesProp).asInstanceOf[Int],
-      numNetworkThreads = parsed.get(NumNetworkThreadsProp).asInstanceOf[Int],
-      numIoThreads = parsed.get(NumIoThreadsProp).asInstanceOf[Int],
-      backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int],
-      queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int],
-
-      /** ********* Socket Server Configuration ***********/
-      port = parsed.get(PortProp).asInstanceOf[Int],
-      hostName = parsed.get(HostNameProp).asInstanceOf[String],
-      _listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]),
-      _advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]),
-      _advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]),
-      _advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]),
-      socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int],
-      socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int],
-      socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int],
-      maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int],
-      _maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String],
-      connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long],
-
-      /** ********* Log Configuration ***********/
-      numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int],
-      _logDir = parsed.get(LogDirProp).asInstanceOf[String],
-      _logDirs = Option(parsed.get(LogDirsProp)).map(_.asInstanceOf[String]),
-
-      logSegmentBytes = parsed.get(LogSegmentBytesProp).asInstanceOf[Int],
-      logRollTimeHours = parsed.get(LogRollTimeHoursProp).asInstanceOf[Int],
-      _logRollTimeMillis = Option(parsed.get(LogRollTimeMillisProp)).map(_.asInstanceOf[Long]),
-
-      logRollTimeJitterHours = parsed.get(LogRollTimeJitterHoursProp).asInstanceOf[Int],
-      _logRollTimeJitterMillis = Option(parsed.get(LogRollTimeJitterMillisProp)).map(_.asInstanceOf[Long]),
-
-      logRetentionTimeHours = parsed.get(LogRetentionTimeHoursProp).asInstanceOf[Int],
-      _logRetentionTimeMins = Option(parsed.get(LogRetentionTimeMinutesProp)).map(_.asInstanceOf[Int]),
-      _logRetentionTimeMillis = Option(parsed.get(LogRetentionTimeMillisProp)).map(_.asInstanceOf[Long]),
-
-      logRetentionBytes = parsed.get(LogRetentionBytesProp).asInstanceOf[Long],
-      logCleanupIntervalMs = parsed.get(LogCleanupIntervalMsProp).asInstanceOf[Long],
-      logCleanupPolicy = parsed.get(LogCleanupPolicyProp).asInstanceOf[String],
-      logCleanerThreads = parsed.get(LogCleanerThreadsProp).asInstanceOf[Int],
-      logCleanerIoMaxBytesPerSecond = parsed.get(LogCleanerIoMaxBytesPerSecondProp).asInstanceOf[Double],
-      logCleanerDedupeBufferSize = parsed.get(LogCleanerDedupeBufferSizeProp).asInstanceOf[Long],
-      logCleanerIoBufferSize = parsed.get(LogCleanerIoBufferSizeProp).asInstanceOf[Int],
-      logCleanerDedupeBufferLoadFactor = parsed.get(LogCleanerDedupeBufferLoadFactorProp).asInstanceOf[Double],
-      logCleanerBackoffMs = parsed.get(LogCleanerBackoffMsProp).asInstanceOf[Long],
-      logCleanerMinCleanRatio = parsed.get(LogCleanerMinCleanRatioProp).asInstanceOf[Double],
-      logCleanerEnable = parsed.get(LogCleanerEnableProp).asInstanceOf[Boolean],
-      logCleanerDeleteRetentionMs = parsed.get(LogCleanerDeleteRetentionMsProp).asInstanceOf[Long],
-      logIndexSizeMaxBytes = parsed.get(LogIndexSizeMaxBytesProp).asInstanceOf[Int],
-      logIndexIntervalBytes = parsed.get(LogIndexIntervalBytesProp).asInstanceOf[Int],
-      logFlushIntervalMessages = parsed.get(LogFlushIntervalMessagesProp).asInstanceOf[Long],
-      logDeleteDelayMs = parsed.get(LogDeleteDelayMsProp).asInstanceOf[Long],
-      logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long],
-      _logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]),
-      logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int],
-      numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int],
-      autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean],
-      minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
-
-      /** ********* Replication configuration ***********/
-      controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int],
-      defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int],
-      replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long],
-      replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int],
-      replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int],
-      replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int],
-      replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int],
-      replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int],
-      replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int],
-      numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int],
-      replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long],
-      fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
-      producerPurgatoryPurgeIntervalRequests = parsed.get(ProducerPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
-      autoLeaderRebalanceEnable = parsed.get(AutoLeaderRebalanceEnableProp).asInstanceOf[Boolean],
-      leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int],
-      leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int],
-      uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
-      interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
-      interBrokerProtocolVersion =  ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
-
-      /** ********* Controlled shutdown configuration ***********/
-      controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
-      controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
-      controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean],
-
-      /** ********* Consumer coordinator configuration ***********/
-      consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int],
-      consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int],
-
-      /** ********* Offset management configuration ***********/
-      offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int],
-      offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int],
-      offsetsTopicReplicationFactor = parsed.get(OffsetsTopicReplicationFactorProp).asInstanceOf[Short],
-      offsetsTopicPartitions = parsed.get(OffsetsTopicPartitionsProp).asInstanceOf[Int],
-      offsetsTopicSegmentBytes = parsed.get(OffsetsTopicSegmentBytesProp).asInstanceOf[Int],
-      offsetsTopicCompressionCodec = Option(parsed.get(OffsetsTopicCompressionCodecProp)).map(_.asInstanceOf[Int]).map(value => CompressionCodec.getCompressionCodec(value)).orNull,
-      offsetsRetentionMinutes = parsed.get(OffsetsRetentionMinutesProp).asInstanceOf[Int],
-      offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long],
-      offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int],
-      offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short],
-      deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean],
-      compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String],
-      metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int],
-      metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long],
-      _metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]]
-    )
-  }
-
-  /**
-   * Create a log config instance using the given properties and defaults
-   */
-  def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
-    val props = new Properties(defaults)
-    props.putAll(overrides)
-    fromProps(props)
-  }
-
-  /**
    * Check that property names are valid
    */
   def validateNames(props: Properties) {
@@ -662,171 +533,149 @@ object KafkaConfig {
       require(names.contains(name), "Unknown configuration \"%s\".".format(name))
   }
 
-  /**
-   * Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid
-   */
-  def validate(props: Properties) {
-    validateNames(props)
-    configDef.parse(props)
+  def fromProps(props: Properties): KafkaConfig = {
+    KafkaConfig(props)
+  }
 
-    // to bootstrap KafkaConfig.validateValues()
-    KafkaConfig.fromProps(props)
+  def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
+    val props = new Properties()
+    props.putAll(defaults)
+    props.putAll(overrides)
+    fromProps(props)
   }
+
 }
 
-class KafkaConfig (/** ********* Zookeeper Configuration ***********/
-                  val zkConnect: String,
-                  val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs,
-                  private val _zkConnectionTimeoutMs: Option[Int] = None,
-                  val zkSyncTimeMs: Int = Defaults.ZkSyncTimeMs,
-
-                  /** ********* General Configuration ***********/
-                  val maxReservedBrokerId: Int = Defaults.MaxReservedBrokerId,
-                  var brokerId: Int = Defaults.BrokerId,
-                  val messageMaxBytes: Int = Defaults.MessageMaxBytes,
-                  val numNetworkThreads: Int = Defaults.NumNetworkThreads,
-                  val numIoThreads: Int = Defaults.NumIoThreads,
-                  val backgroundThreads: Int = Defaults.BackgroundThreads,
-                  val queuedMaxRequests: Int = Defaults.QueuedMaxRequests,
-
-                  /** ********* Socket Server Configuration ***********/
-                  val port: Int = Defaults.Port,
-                  val hostName: String = Defaults.HostName,
-                  private val _listeners: Option[String] = None,
-                  private val _advertisedHostName: Option[String] = None,
-                  private val _advertisedPort: Option[Int] = None,
-                  private val _advertisedListeners: Option[String] = None,
-                  val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes,
-                  val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes,
-                  val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes,
-                  val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp,
-                  private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides,
-                  val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs,
-
-                  /** ********* Log Configuration ***********/
-                  val numPartitions: Int = Defaults.NumPartitions,
-                  private val _logDir: String = Defaults.LogDir,
-                  private val _logDirs: Option[String] = None,
-
-                  val logSegmentBytes: Int = Defaults.LogSegmentBytes,
-
-                  val logRollTimeHours: Int = Defaults.LogRollHours,
-                  private val _logRollTimeMillis: Option[Long] = None,
-
-                  val logRollTimeJitterHours: Int = Defaults.LogRollJitterHours,
-                  private val _logRollTimeJitterMillis: Option[Long] = None,
-
-                  val logRetentionTimeHours: Int = Defaults.LogRetentionHours,
-                  private val _logRetentionTimeMins: Option[Int] = None,
-                  private val _logRetentionTimeMillis: Option[Long] = None,
-
-                  val logRetentionBytes: Long = Defaults.LogRetentionBytes,
-                  val logCleanupIntervalMs: Long = Defaults.LogCleanupIntervalMs,
-                  val logCleanupPolicy: String = Defaults.LogCleanupPolicy,
-                  val logCleanerThreads: Int = Defaults.LogCleanerThreads,
-                  val logCleanerIoMaxBytesPerSecond: Double = Defaults.LogCleanerIoMaxBytesPerSecond,
-                  val logCleanerDedupeBufferSize: Long = Defaults.LogCleanerDedupeBufferSize,
-                  val logCleanerIoBufferSize: Int = Defaults.LogCleanerIoBufferSize,
-                  val logCleanerDedupeBufferLoadFactor: Double = Defaults.LogCleanerDedupeBufferLoadFactor,
-                  val logCleanerBackoffMs: Long = Defaults.LogCleanerBackoffMs,
-                  val logCleanerMinCleanRatio: Double = Defaults.LogCleanerMinCleanRatio,
-                  val logCleanerEnable: Boolean = Defaults.LogCleanerEnable,
-                  val logCleanerDeleteRetentionMs: Long = Defaults.LogCleanerDeleteRetentionMs,
-                  val logIndexSizeMaxBytes: Int = Defaults.LogIndexSizeMaxBytes,
-                  val logIndexIntervalBytes: Int = Defaults.LogIndexIntervalBytes,
-                  val logFlushIntervalMessages: Long = Defaults.LogFlushIntervalMessages,
-                  val logDeleteDelayMs: Long = Defaults.LogDeleteDelayMs,
-                  val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs,
-                  private val _logFlushIntervalMs: Option[Long] = None,
-                  val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs,
-                  val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir,
-                  val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable,
-
-                  val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
-
-                  /** ********* Replication configuration ***********/
-                  val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs,
-                  val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor,
-                  val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs,
-                  val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs,
-                  val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes,
-                  val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes,
-                  val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs,
-                  val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes,
-                  val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs,
-                  val numReplicaFetchers: Int = Defaults.NumReplicaFetchers,
-                  val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs,
-                  val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests,
-                  val producerPurgatoryPurgeIntervalRequests: Int = Defaults.ProducerPurgatoryPurgeIntervalRequests,
-                  val autoLeaderRebalanceEnable: Boolean = Defaults.AutoLeaderRebalanceEnable,
-                  val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage,
-                  val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds,
-                  val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
-                  val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol),
-                  val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion),
-
-                  /** ********* Controlled shutdown configuration ***********/
-                  val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries,
-                  val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs,
-                  val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable,
-
-                  /** ********* Consumer coordinator configuration ***********/
-                  val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs,
-                  val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs,
-
-                  /** ********* Offset management configuration ***********/
-                  val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize,
-                  val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize,
-                  val offsetsTopicReplicationFactor: Short = Defaults.OffsetsTopicReplicationFactor,
-                  val offsetsTopicPartitions: Int = Defaults.OffsetsTopicPartitions,
-                  val offsetsTopicSegmentBytes: Int = Defaults.OffsetsTopicSegmentBytes,
-                  val offsetsTopicCompressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(Defaults.OffsetsTopicCompressionCodec),
-                  val offsetsRetentionMinutes: Int = Defaults.OffsetsRetentionMinutes,
-                  val offsetsRetentionCheckIntervalMs: Long = Defaults.OffsetsRetentionCheckIntervalMs,
-                  val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs,
-                  val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks,
-
-                  val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable,
-                  val compressionType: String = Defaults.CompressionType,
-
-                  val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs,
-                  val metricNumSamples: Int = Defaults.MetricNumSamples,
-                  private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses)
-                   ) {
-
-  val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
-
-  val listeners = getListeners()
-  val advertisedHostName: String = _advertisedHostName.getOrElse(hostName)
-  val advertisedPort: Int = _advertisedPort.getOrElse(port)
-  val advertisedListeners = getAdvertisedListeners()
-  val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir))
-
-  val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours)
-  val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours)
-  val logRetentionTimeMillis = getLogRetentionTimeMillis
+case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
+
+  /** ********* Zookeeper Configuration ***********/
+  val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
+  val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
+  val zkConnectionTimeoutMs: java.lang.Integer =
+    Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
+  val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
 
-  val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs)
+  /** ********* General Configuration ***********/
+  val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
+  var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
+  val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
+  val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
+  val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
+  val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
+  val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
 
+  /** ********* Socket Server Configuration ***********/
+  val hostName = getString(KafkaConfig.HostNameProp)
+  val port = getInt(KafkaConfig.PortProp)
+  val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName)
+  val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port)
+
+  val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
+  val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
+  val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp)
+  val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
   val maxConnectionsPerIpOverrides: Map[String, Int] =
-    getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)}
+    getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
+  val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
+
 
-  val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses)
+  /** ********* Log Configuration ***********/
+  val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
+  val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
+  val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
+  val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
+  val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
+  val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
+  val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
+  val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
+  val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+  val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
+  val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp)
+  val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
+  val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
+  val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
+  val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
+  val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
+  val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
+  val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
+  val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
+  val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
+  val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
+  val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
+  val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
+  val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
+  val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
+  val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
+  val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
+  val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+  val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+
+  /** ********* Replication configuration ***********/
+  val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
+  val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp)
+  val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
+  val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
+  val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
+  val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
+  val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
+  val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
+  val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
+  val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
+  val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
+  val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
+  val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+  val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
+  val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
+  val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
+  val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
+  val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
+
+  /** ********* Controlled shutdown configuration ***********/
+  val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
+  val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
+  val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
+
+  /** ********* Consumer coordinator configuration ***********/
+  val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp)
+  val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp)
+
+  /** ********* Offset management configuration ***********/
+  val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
+  val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp)
+  val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp)
+  val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp)
+  val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp)
+  val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp)
+  val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp)
+  val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull
+
+  /** ********* Metric Configuration **************/
+  val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
+  val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
+  val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
+
+  val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
+  val compressionType = getString(KafkaConfig.CompressionTypeProp)
+
+
+  val listeners = getListeners
+  val advertisedListeners = getAdvertisedListeners
+  val logRetentionTimeMillis = getLogRetentionTimeMillis
 
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
 
-    val millis = {
-      _logRetentionTimeMillis.getOrElse(
-        _logRetentionTimeMins match {
-          case Some(mins) => millisInMinute * mins
-          case None => millisInHour * logRetentionTimeHours
-        }
-      )
-    }
-    if (millis < 0) return -1
-    millis
+    val millis: java.lang.Long =
+      Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
+        Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
+          case Some(mins) =>  millisInMinute * mins
+          case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
+        })
+
+      if (millis < 0) return -1
+      millis
   }
 
   private def getMap(propName: String, propValue: String): Map[String, String] = {
@@ -855,9 +704,9 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
   // If the user did not define listeners but did define host or port, let's use them in backward compatible way
   // If none of those are defined, we default to PLAINTEXT://:9092
   private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
-    if (_listeners.isDefined) {
-      validateUniquePortAndProtocol(_listeners.get)
-      CoreUtils.listenerListToEndPoints(_listeners.get)
+    if (getString(KafkaConfig.ListenersProp) != null) {
+      validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp))
+      CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp))
     } else {
       CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
     }
@@ -867,11 +716,12 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
   // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
   private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
-    if (_advertisedListeners.isDefined) {
-      validateUniquePortAndProtocol(_advertisedListeners.get)
-      CoreUtils.listenerListToEndPoints(_advertisedListeners.get)
-    } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) {
-      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName  + ":" + advertisedPort)
+    if (getString(KafkaConfig.AdvertisedListenersProp) != null) {
+      validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp))
+      CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp))
+    } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) {
+      CoreUtils.listenerListToEndPoints("PLAINTEXT://" +
+            getString(KafkaConfig.AdvertisedHostNameProp) + ":" + getInt(KafkaConfig.AdvertisedPortProp))
     } else {
       getListeners()
     }
@@ -886,7 +736,7 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
       val reporterName = iterator.next()
       if (!reporterName.isEmpty) {
         val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName)
-        reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]])
+        reporter.configure(originals)
         reporterList.add(reporter)
       }
     }
@@ -895,19 +745,13 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
 
   }
 
-
-
   validateValues()
 
   private def validateValues() {
     require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
-
-    require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
-    require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
-
     require(logDirs.size > 0)
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
     require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@@ -920,127 +764,4 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
   }
-
-  def toProps: Properties = {
-    val props = new Properties()
-    import kafka.server.KafkaConfig._
-    /** ********* Zookeeper Configuration ***********/
-    props.put(ZkConnectProp, zkConnect)
-    props.put(ZkSessionTimeoutMsProp, zkSessionTimeoutMs.toString)
-    _zkConnectionTimeoutMs.foreach(value => props.put(ZkConnectionTimeoutMsProp, value.toString))
-    props.put(ZkSyncTimeMsProp, zkSyncTimeMs.toString)
-
-    /** ********* General Configuration ***********/
-    props.put(MaxReservedBrokerIdProp, maxReservedBrokerId.toString)
-    props.put(BrokerIdProp, brokerId.toString)
-    props.put(MessageMaxBytesProp, messageMaxBytes.toString)
-    props.put(NumNetworkThreadsProp, numNetworkThreads.toString)
-    props.put(NumIoThreadsProp, numIoThreads.toString)
-    props.put(BackgroundThreadsProp, backgroundThreads.toString)
-    props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString)
-
-    /** ********* Socket Server Configuration ***********/
-    props.put(PortProp, port.toString)
-    props.put(HostNameProp, hostName)
-    _listeners.foreach(props.put(ListenersProp, _))
-    _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _))
-    _advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString))
-    _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _))
-    props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString)
-    props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString)
-    props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString)
-    props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString)
-    props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides)
-    props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString)
-
-    /** ********* Log Configuration ***********/
-    props.put(NumPartitionsProp, numPartitions.toString)
-    props.put(LogDirProp, _logDir)
-    _logDirs.foreach(value => props.put(LogDirsProp, value))
-    props.put(LogSegmentBytesProp, logSegmentBytes.toString)
-
-    props.put(LogRollTimeHoursProp, logRollTimeHours.toString)
-    _logRollTimeMillis.foreach(v => props.put(LogRollTimeMillisProp, v.toString))
-
-    props.put(LogRollTimeJitterHoursProp, logRollTimeJitterHours.toString)
-    _logRollTimeJitterMillis.foreach(v => props.put(LogRollTimeJitterMillisProp, v.toString))
-
-
-    props.put(LogRetentionTimeHoursProp, logRetentionTimeHours.toString)
-    _logRetentionTimeMins.foreach(v => props.put(LogRetentionTimeMinutesProp, v.toString))
-    _logRetentionTimeMillis.foreach(v => props.put(LogRetentionTimeMillisProp, v.toString))
-
-    props.put(LogRetentionBytesProp, logRetentionBytes.toString)
-    props.put(LogCleanupIntervalMsProp, logCleanupIntervalMs.toString)
-    props.put(LogCleanupPolicyProp, logCleanupPolicy)
-    props.put(LogCleanerThreadsProp, logCleanerThreads.toString)
-    props.put(LogCleanerIoMaxBytesPerSecondProp, logCleanerIoMaxBytesPerSecond.toString)
-    props.put(LogCleanerDedupeBufferSizeProp, logCleanerDedupeBufferSize.toString)
-    props.put(LogCleanerIoBufferSizeProp, logCleanerIoBufferSize.toString)
-    props.put(LogCleanerDedupeBufferLoadFactorProp, logCleanerDedupeBufferLoadFactor.toString)
-    props.put(LogCleanerBackoffMsProp, logCleanerBackoffMs.toString)
-    props.put(LogCleanerMinCleanRatioProp, logCleanerMinCleanRatio.toString)
-    props.put(LogCleanerEnableProp, logCleanerEnable.toString)
-    props.put(LogCleanerDeleteRetentionMsProp, logCleanerDeleteRetentionMs.toString)
-    props.put(LogIndexSizeMaxBytesProp, logIndexSizeMaxBytes.toString)
-    props.put(LogIndexIntervalBytesProp, logIndexIntervalBytes.toString)
-    props.put(LogFlushIntervalMessagesProp, logFlushIntervalMessages.toString)
-    props.put(LogDeleteDelayMsProp, logDeleteDelayMs.toString)
-    props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString)
-    _logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString))
-    props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString)
-    props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString)
-    props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString)
-    props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
-
-    /** ********* Replication configuration ***********/
-    props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString)
-    props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString)
-    props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-    props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString)
-    props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString)
-    props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString)
-    props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-    props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
-    props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString)
-    props.put(NumReplicaFetchersProp, numReplicaFetchers.toString)
-    props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString)
-    props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString)
-    props.put(ProducerPurgatoryPurgeIntervalRequestsProp, producerPurgatoryPurgeIntervalRequests.toString)
-    props.put(AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
-    props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString)
-    props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString)
-    props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
-    props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString)
-    props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
-
-
-    /** ********* Controlled shutdown configuration ***********/
-    props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString)
-    props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString)
-    props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString)
-
-    /** ********* Consumer coordinator configuration ***********/
-    props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString)
-    props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString)
-
-    /** ********* Offset management configuration ***********/
-    props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString)
-    props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString)
-    props.put(OffsetsTopicReplicationFactorProp, offsetsTopicReplicationFactor.toString)
-    props.put(OffsetsTopicPartitionsProp, offsetsTopicPartitions.toString)
-    props.put(OffsetsTopicSegmentBytesProp, offsetsTopicSegmentBytes.toString)
-    props.put(OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString)
-    props.put(OffsetsRetentionMinutesProp, offsetsRetentionMinutes.toString)
-    props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString)
-    props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString)
-    props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString)
-    props.put(DeleteTopicEnableProp, deleteTopicEnable.toString)
-    props.put(CompressionTypeProp, compressionType.toString)
-    props.put(MetricNumSamplesProp, metricNumSamples.toString)
-    props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString)
-    props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(","))
-
-    props
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b320ce9..9de2a6f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,6 +17,9 @@
 
 package kafka.server
 
+import java.util
+import java.util.Properties
+
 import kafka.admin._
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
@@ -388,23 +391,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def boundPort(): Int = socketServer.boundPort()
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
-    val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
-                                     segmentMs = config.logRollTimeMillis,
-                                     segmentJitterMs = config.logRollTimeJitterMillis,
-                                     flushInterval = config.logFlushIntervalMessages,
-                                     flushMs = config.logFlushIntervalMs.toLong,
-                                     retentionSize = config.logRetentionBytes,
-                                     retentionMs = config.logRetentionTimeMillis,
-                                     maxMessageSize = config.messageMaxBytes,
-                                     maxIndexSize = config.logIndexSizeMaxBytes,
-                                     indexInterval = config.logIndexIntervalBytes,
-                                     deleteRetentionMs = config.logCleanerDeleteRetentionMs,
-                                     fileDeleteDelayMs = config.logDeleteDelayMs,
-                                     minCleanableRatio = config.logCleanerMinCleanRatio,
-                                     compact = config.logCleanupPolicy.trim.toLowerCase == "compact",
-                                     minInSyncReplicas = config.minInSyncReplicas,
-                                     compressionType = config.compressionType)
-    val defaultProps = defaultLogConfig.toProps
+    val defaultProps = copyKafkaConfigToLog(config.originals)
+    val defaultLogConfig = LogConfig(defaultProps)
+
     val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
     // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
@@ -428,6 +417,38 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    time = time)
   }
 
+  // Copy the subset of properties that are relevant to Logs
+  // I'm listing out individual properties here since the names are slightly different in each Config class...
+  private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = {
+
+    val logProps = new util.HashMap[String, Object]()
+    val entryset = serverProps.entrySet.iterator
+    while (entryset.hasNext) {
+      val entry = entryset.next
+      entry.getKey match {
+        case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
+        case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue)
+        case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
+        case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
+        case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
+        case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue)
+        case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
+        case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue)
+        case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
+        case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
+        case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
+        case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
+        case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
+        case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
+        case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
+        case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
+        case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
+        case _ => // we just leave those out
+      }
+    }
+    logProps
+  }
+
   private def createOffsetManager(): OffsetManager = {
     val offsetManagerConfig = OffsetManagerConfig(
       maxMetadataSize = config.offsetMetadataMaxSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 181cbc1..c89d00b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -90,7 +90,7 @@ class ReplicaFetcherThread(name:String,
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
+      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
         topicAndPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
         fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
@@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
-    delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs)
+    delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index b675a7e..01b1b0a 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -101,9 +101,10 @@ class TopicConfigManager(private val zkClient: ZkClient,
             val topic = json.substring(1, json.length - 1) // hacky way to dequote
             if (logsByTopic.contains(topic)) {
               /* combine the default properties with the overrides in zk to create the new LogConfig */
-              val props = new Properties(logManager.defaultConfig.toProps)
+              val props = new Properties()
+              props.putAll(logManager.defaultConfig.originals)
               props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
-              val logConfig = LogConfig.fromProps(props)
+              val logConfig = LogConfig(props)
               for (log <- logsByTopic(topic))
                 log.config = logConfig
               info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index d0a8fa7..f5d704c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -254,32 +254,6 @@ object CoreUtils extends Logging {
   }
 
   /**
-   * Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example
-   * illustrates difference from the cast
-   * <pre>
-   * val defaults = new Properties()
-   * defaults.put("foo", "bar")
-   * val props = new Properties(defaults)
-   *
-   * props.getProperty("foo") // "bar"
-   * props.get("foo") // null
-   * evaluateDefaults(props).get("foo") // "bar"
-   * </pre>
-   *
-   * @param props properties to evaluate
-   * @return new java.util.Map instance
-   */
-  def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
-    import java.util._
-    import JavaConversions.asScalaSet
-    val evaluated = new HashMap[String, String]()
-    for (name <- props.stringPropertyNames()) {
-      evaluated.put(name, props.getProperty(name))
-    }
-    evaluated
-  }
-
-  /**
    * Read a big-endian integer from a byte array
    */
   def readInt(bytes: Array[Byte], offset: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index c0e248d..225d77b 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -17,6 +17,7 @@
 
 package kafka
 
+import java.util.Properties
 import java.util.concurrent.atomic._
 import kafka.common._
 import kafka.message._
@@ -33,10 +34,13 @@ object StressTestLog {
   def main(args: Array[String]) {
     val dir = TestUtils.tempDir()
     val time = new MockTime
+    val logProprties = new Properties()
+    logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer)
+    logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer)
+    logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer)
+
     val log = new Log(dir = dir,
-                      config = LogConfig(segmentSize = 64*1024*1024,
-                                         maxMessageSize = Int.MaxValue,
-                                         maxIndexSize = 1024*1024),
+                      config = LogConfig(logProprties),
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 3034c4f..236d857 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -20,7 +20,7 @@ package kafka
 import java.io._
 import java.nio._
 import java.nio.channels._
-import java.util.Random
+import java.util.{Properties, Random}
 import kafka.log._
 import kafka.utils._
 import kafka.message._
@@ -110,7 +110,10 @@ object TestLinearWriteSpeed {
         writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer)
       } else if(options.has(logOpt)) {
         val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect
-        writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet)
+        val logProperties = new Properties()
+        logProperties.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+        logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long)
+        writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet)
       } else {
         System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") 
         System.exit(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 375555f..6180b87 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -26,7 +26,7 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.{ Collection, ArrayList }
+import java.util.{Properties, Collection, ArrayList}
 import kafka.server.KafkaConfig
 import org.apache.kafka.common.record.CompressionType
 import scala.collection.JavaConversions._
@@ -54,9 +54,10 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
   @Test
   def testBrokerSideCompression() {
     val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression)
-
+    val logProps = new Properties()
+    logProps.put(LogConfig.CompressionTypeProp,brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append two messages */
     log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8b8249a..0e2a6a1 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import java.util.Properties
+
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Test}
@@ -35,7 +37,11 @@ import org.apache.kafka.common.utils.Utils
 class CleanerTest extends JUnitSuite {
   
   val dir = TestUtils.tempDir()
-  val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true)
+  val logProps = new Properties()
+  logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  val logConfig = LogConfig(logProps)
   val time = new MockTime()
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
   
@@ -50,8 +56,11 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testCleanSegments() {
     val cleaner = makeCleaner(Int.MaxValue)
-    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
-    
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
@@ -72,7 +81,10 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testCleaningWithDeletes() {
     val cleaner = makeCleaner(Int.MaxValue)
-    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
     
     // append messages with the keys 0 through N
     while(log.numberOfSegments < 2)
@@ -98,7 +110,11 @@ class CleanerTest extends JUnitSuite {
     val cleaner = makeCleaner(Int.MaxValue)
 
     // create a log with compaction turned off so we can append unkeyed messages
-    val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false))
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // append unkeyed messages
     while(log.numberOfSegments < 2)
@@ -114,7 +130,9 @@ class CleanerTest extends JUnitSuite {
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
 
     // turn on compaction and compact the log
-    val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024))
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
 
     assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
@@ -139,7 +157,10 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testCleanSegmentsWithAbort() {
     val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
-    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
@@ -159,7 +180,11 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testSegmentGrouping() {
     val cleaner = makeCleaner(Int.MaxValue)
-    val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1))
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
     
     // append some messages to the log
     var i = 0
@@ -208,7 +233,12 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testSegmentGroupingWithSparseOffsets() {
     val cleaner = makeCleaner(Int.MaxValue)
-    val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1))
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
        
     // fill up first segment
     while (log.numberOfSegments == 1)
@@ -288,7 +318,12 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testRecoveryAfterCrash() {
     val cleaner = makeCleaner(Int.MaxValue)
-    val config = logConfig.copy(segmentSize = 300, indexInterval = 1, fileDeleteDelayMs = 10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
+
+    val config = LogConfig.fromProps(logConfig.originals, logProps)
       
     def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {   
       // Recover log file and check that after recovery, keys are as expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 471ddff..381e9aa 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import java.io.File
+import java.util.Properties
 
 import kafka.common.TopicAndPartition
 import kafka.message._
@@ -127,8 +128,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
     for(i <- 0 until parts) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
+      val logProps = new Properties()
+      logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+      logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
+      logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
+      logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
       val log = new Log(dir = dir,
-                        LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true),
+                        LogConfig(logProps),
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)