You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/14 03:29:08 UTC
svn commit: r1384624 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/log/ main/scala/kafka/network/ main/scala/kafka/server/
test/scala/unit/kafka/log/ test/scala/unit/kafka/network/
Author: junrao
Date: Fri Sep 14 01:29:07 2012
New Revision: 1384624
URL: http://svn.apache.org/viewvc?rev=1384624&view=rev
Log:
revisit broker config in 0.8; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-325
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Sep 14 01:29:07 2012
@@ -23,7 +23,7 @@ import scala.collection._
import kafka.server.KafkaConfig
import kafka.api.OffsetRequest
import kafka.log.Log._
-import kafka.common.{KafkaException, UnknownTopicOrPartitionException}
+import kafka.common.KafkaException
/**
* The guy who creates and hands out logs
@@ -95,12 +95,6 @@ private[kafka] class LogManager(val conf
* Create a log for the given topic and the given partition
*/
private def createLog(topic: String, partition: Int): Log = {
- if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
- val error = "Wrong partition %d, valid partitions (0, %d)."
- .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
- warn(error)
- throw new UnknownTopicOrPartitionException(error)
- }
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Fri Sep 14 01:29:07 2012
@@ -34,7 +34,6 @@ import kafka.utils._
class SocketServer(val brokerId: Int,
val port: Int,
val numProcessorThreads: Int,
- val monitoringPeriodSecs: Int,
val maxQueuedRequests: Int,
val maxRequestSize: Int = Int.MaxValue) extends Logging {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Sep 14 01:29:07 2012
@@ -64,9 +64,6 @@ class KafkaConfig private (val props: Ve
/* the number of queued requests allowed before blocking the network threads */
val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
- /* the interval in which to measure performance statistics */
- val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue))
-
/* the default number of log partitions per topic */
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
@@ -112,9 +109,6 @@ class KafkaConfig private (val props: Ve
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
- /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
- val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
-
/* enable auto creation of topic on the server */
val autoCreateTopics = props.getBoolean("auto.create.topics", true)
@@ -143,9 +137,6 @@ class KafkaConfig private (val props: Ve
val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
- /* size of the state change request queue in Zookeeper */
- val stateChangeQSize = props.getInt("state.change.queue.size", 1000)
-
/**
* Config options relevant to a follower for a replica
*/
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Sep 14 01:29:07 2012
@@ -75,7 +75,6 @@ class KafkaServer(val config: KafkaConfi
socketServer = new SocketServer(config.brokerId,
config.port,
config.numNetworkThreads,
- config.monitoringPeriodSecs,
config.numQueuedRequests,
config.maxSocketRequestSize)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Sep 14 01:29:07 2012
@@ -179,33 +179,4 @@ class LogManagerTest extends JUnit3Suite
assertTrue("The last flush time has to be within defaultflushInterval of current time ",
(System.currentTimeMillis - log.getLastFlushedTime) < 100)
}
-
- @Test
- def testConfigurablePartitions() {
- val props = TestUtils.createBrokerConfig(0, -1)
- logManager.shutdown()
- config = new KafkaConfig(props) {
- override val logFileSize = 256
- override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
- override val flushInterval = 100
- }
- logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
- logManager.startup
-
- for(i <- 0 until 1) {
- val log = logManager.getOrCreateLog(name, i)
- for(i <- 0 until 250) {
- var set = TestUtils.singleMessageSet("test".getBytes())
- log.append(set)
- }
- }
-
- try
- {
- val log = logManager.getOrCreateLog(name, 2)
- assertTrue("Should not come here", log != null)
- } catch {
- case _ =>
- }
- }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1384624&r1=1384623&r2=1384624&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Fri Sep 14 01:29:07 2012
@@ -33,7 +33,6 @@ class SocketServerTest extends JUnitSuit
val server: SocketServer = new SocketServer(0,
port = TestUtils.choosePort,
numProcessorThreads = 1,
- monitoringPeriodSecs = 30,
maxQueuedRequests = 50,
maxRequestSize = 50)
server.startup()