You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/01/13 00:45:37 UTC
kafka git commit: KAFKA-1070 Auto assign broker id;
reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk e52a6181b -> b1b80860a
KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1b80860
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1b80860
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1b80860
Branch: refs/heads/trunk
Commit: b1b80860a01cc378cfada3549a3480f0773c3ff8
Parents: e52a618
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Mon Jan 12 15:45:13 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 12 15:45:29 2015 -0800
----------------------------------------------------------------------
.../common/GenerateBrokerIdException.scala | 27 +++
.../common/InconsistentBrokerIdException.scala | 27 +++
.../kafka/log/LogCleanerManager.scala.orig | 203 +++++++++++++++++++
.../kafka/server/BrokerMetadataCheckpoint.scala | 83 ++++++++
.../main/scala/kafka/server/KafkaConfig.scala | 20 +-
.../main/scala/kafka/server/KafkaServer.scala | 87 ++++++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 38 +++-
.../server/ServerGenerateBrokerIdTest.scala | 127 ++++++++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 9 +-
9 files changed, 596 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
new file mode 100644
index 0000000..13784fe
--- /dev/null
+++ b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when there is a failure to generate a zookeeper sequenceId to use as brokerId
+ */
+class GenerateBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+ def this(message: String) = this(message, null)
+ def this(cause: Throwable) = this(null, cause)
+ def this() = this(null, null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
new file mode 100644
index 0000000..0c0d1cd
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the brokerId stored in logDirs is not consistent across logDirs.
+ */
+class InconsistentBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+ def this(message: String) = this(message, null)
+ def this(cause: Throwable) = this(null, cause)
+ def this() = this(null, null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
new file mode 100644
index 0000000..e8ced6a
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import kafka.utils.{Logging, Pool}
+import kafka.server.OffsetCheckpoint
+import collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils._
+import java.util.concurrent.TimeUnit
+import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+
+private[log] sealed trait LogCleaningState
+private[log] case object LogCleaningInProgress extends LogCleaningState
+private[log] case object LogCleaningAborted extends LogCleaningState
+private[log] case object LogCleaningPaused extends LogCleaningState
+
+/**
+ * Manage the state of each partition being cleaned.
+ * If a partition is to be cleaned, it enters the LogCleaningInProgress state.
+ * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
+ * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
+ * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
+ * requested to be resumed.
+ */
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
+
+ override val loggerName = classOf[LogCleaner].getName
+
+ /* the offset checkpoints holding the last cleaned point for each log */
+ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+
+ /* the set of logs currently being cleaned */
+ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+
+ /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+ private val lock = new ReentrantLock
+
+ /* for coordinating the pausing and the cleaning of a partition */
+ private val pausedCleaningCond = lock.newCondition()
+
+ /* a gauge for tracking the cleanable ratio of the dirtiest log */
+ @volatile private var dirtiestLogCleanableRatio = 0.0
+ newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
+
+ /**
+ * @return the position processed for all logs.
+ */
+ def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+ checkpoints.values.flatMap(_.read()).toMap
+
+ /**
+ * Choose the log to clean next and add it to the in-progress set. We recompute this
+ * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+ * the log manager maintains.
+ */
+ def grabFilthiestLog(): Option[LogToClean] = {
+ inLock(lock) {
+ val lastClean = allCleanerCheckpoints()
+ val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe
+ .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+ .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each
+ lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+ .filter(l => l.totalBytes > 0) // skip any empty logs
+ this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
+ val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+ if(cleanableLogs.isEmpty) {
+ None
+ } else {
+ val filthiest = cleanableLogs.max
+ inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
+ Some(filthiest)
+ }
+ }
+ }
+
+ /**
+ * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+ * the partition is aborted.
+ * This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
+ */
+ def abortCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ abortAndPauseCleaning(topicAndPartition)
+ resumeCleaning(topicAndPartition)
+ info("The cleaning for partition %s is aborted".format(topicAndPartition))
+ }
+ }
+
+ /**
+ * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+ * This call blocks until the cleaning of the partition is aborted and paused.
+ * 1. If the partition is not in progress, mark it as paused.
+ * 2. Otherwise, first mark the state of the partition as aborted.
+ * 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
+ * throws a LogCleaningAbortedException to stop the cleaning task.
+ * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
+ * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
+ */
+ def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ inProgress.get(topicAndPartition) match {
+ case None =>
+ inProgress.put(topicAndPartition, LogCleaningPaused)
+ case Some(state) =>
+ state match {
+ case LogCleaningInProgress =>
+ inProgress.put(topicAndPartition, LogCleaningAborted)
+ case s =>
+ throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
+ .format(topicAndPartition, s))
+ }
+ }
+ while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+ pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
+ info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
+ }
+ }
+
+ /**
+ * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
+ */
+ def resumeCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ inProgress.get(topicAndPartition) match {
+ case None =>
+ throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
+ .format(topicAndPartition))
+ case Some(state) =>
+ state match {
+ case LogCleaningPaused =>
+ inProgress.remove(topicAndPartition)
+ case s =>
+ throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
+ .format(topicAndPartition, s))
+ }
+ }
+ }
+ info("Compaction for partition %s is resumed".format(topicAndPartition))
+ }
+
+ /**
+ * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
+ */
+ def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+ inProgress.get(topicAndPartition) match {
+ case None => return false
+ case Some(state) =>
+ if (state == expectedState)
+ return true
+ else
+ return false
+ }
+ }
+
+ /**
+ * Check if the cleaning for a partition is aborted. If so, throw an exception.
+ */
+ def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+ throw new LogCleaningAbortedException()
+ }
+ }
+
+ /**
+ * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
+ */
+ def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+ inLock(lock) {
+ inProgress(topicAndPartition) match {
+ case LogCleaningInProgress =>
+ val checkpoint = checkpoints(dataDir)
+ val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+ checkpoint.write(offsets)
+ inProgress.remove(topicAndPartition)
+ case LogCleaningAborted =>
+ inProgress.put(topicAndPartition, LogCleaningPaused)
+ pausedCleaningCond.signalAll()
+ case s =>
+ throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
new file mode 100644
index 0000000..0e542ff
--- /dev/null
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io._
+import java.util.Properties
+import kafka.utils._
+
+
+case class BrokerMetadata(brokerId: Int)
+
+/**
+ * This class saves broker's metadata to a file
+ */
+class BrokerMetadataCheckpoint(val file: File) extends Logging {
+ private val lock = new Object()
+ new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+
+ def write(brokerMetadata: BrokerMetadata) = {
+ lock synchronized {
+ try {
+ val brokerMetaProps = new Properties()
+ brokerMetaProps.setProperty("version", 0.toString)
+ brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
+ val temp = new File(file.getAbsolutePath + ".tmp")
+ val fileOutputStream = new FileOutputStream(temp)
+ brokerMetaProps.store(fileOutputStream,"")
+ fileOutputStream.flush()
+ fileOutputStream.getFD().sync()
+ fileOutputStream.close()
+ // swap new BrokerMetadata file with previous one
+ if(!temp.renameTo(file)) {
+ // renameTo() fails on windows if destination file exists.
+ file.delete()
+ if(!temp.renameTo(file))
+ throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath()))
+ }
+ } catch {
+ case ie: IOException =>
+ error("Failed to write meta.properties due to ",ie)
+ throw ie
+ }
+ }
+ }
+
+ def read(): Option[BrokerMetadata] = {
+ lock synchronized {
+ try {
+ val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath()))
+ val version = brokerMetaProps.getIntInRange("version", (0, Int.MaxValue))
+ version match {
+ case 0 =>
+ val brokerId = brokerMetaProps.getIntInRange("broker.id", (0, Int.MaxValue))
+ return Some(BrokerMetadata(brokerId))
+ case _ =>
+ throw new IOException("Unrecognized version of the server meta.properties file: " + version)
+ }
+ } catch {
+ case e: FileNotFoundException =>
+ warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage))
+ None
+ case e1: Exception =>
+ error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage))
+ throw e1
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e26c54..bbd3fd7 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
private def getLogRetentionTimeMillis(): Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
-
+
if(props.containsKey("log.retention.ms")){
props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
}
else if(props.containsKey("log.retention.minutes")){
millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
- }
+ }
else {
millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
}
@@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
private def getLogRollTimeMillis(): Long = {
val millisInHour = 60L * 60L * 1000L
-
+
if(props.containsKey("log.roll.ms")){
props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
}
@@ -71,8 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/*********** General Configuration ***********/
- /* the broker id for this server */
- val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
+ /* Max number that can be used for a broker.id */
+ val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue))
+
+ /* The broker id for this server.
+ * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId
+ * added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1.
+ */
+ var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1
/* the maximum size of message that the server can receive */
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
@@ -117,10 +123,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the maximum number of bytes in a socket request */
val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
-
+
/* the maximum number of connections we allow from each ip address */
val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue))
-
+
/* per-ip or hostname overrides to the default maximum number of connections */
val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt))
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1691ad7..a069eb9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,11 +25,12 @@ import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
+import collection.mutable
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker
import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
-import kafka.common.ErrorMapping
+import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
import kafka.network.{Receive, BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
@@ -39,10 +40,11 @@ import com.yammer.metrics.core.Gauge
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
- this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private var startupComplete = new AtomicBoolean(false)
+ private var brokerId: Int = -1
+
val brokerState: BrokerState = new BrokerState
val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
@@ -56,6 +58,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
+ val brokerMetaPropsFile = "meta.properties"
+ val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
newGauge(
"BrokerState",
@@ -77,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start scheduler */
kafkaScheduler.startup()
-
+
/* setup zookeeper */
zkClient = initZk()
@@ -85,6 +89,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
logManager = createLogManager(zkClient, brokerState)
logManager.startup()
+ /* generate brokerId */
+ config.brokerId = getBrokerId
+ this.logIdent = "[Kafka Server " + config.brokerId + "], "
+
socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
@@ -104,26 +112,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
offsetManager = createOffsetManager()
kafkaController = new KafkaController(config, zkClient, brokerState)
-
+
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
-
+
Mx4jLoader.maybeLoad()
replicaManager.startup()
kafkaController.startup()
-
+
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
-
+
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
-
registerStats()
startupComplete.set(true)
info("started")
@@ -181,10 +188,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
info("Starting controlled shutdown")
var channel : BlockingChannel = null
var prevController : Broker = null
- var shutdownSuceeded : Boolean = false
+ var shutdownSucceeded : Boolean = false
try {
brokerState.newState(PendingControlledShutdown)
- while (!shutdownSuceeded && remainingRetries > 0) {
+ while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
@@ -223,7 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.size == 0) {
- shutdownSuceeded = true
+ shutdownSucceeded = true
info ("Controlled shutdown succeeded")
}
else {
@@ -239,7 +246,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// ignore and try again
}
}
- if (!shutdownSuceeded) {
+ if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
@@ -251,7 +258,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
channel = null
}
}
- if (!shutdownSuceeded) {
+ if (!shutdownSucceeded) {
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}
@@ -307,7 +314,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
-
+
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,
@@ -359,5 +366,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
}
-}
+ /**
+ * Generates new brokerId or reads from meta.properties based on following conditions
+ * <ol>
+ * <li> config has no broker.id provided , generates a broker.id based on Zookeeper's sequence
+ * <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
+ * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
+ * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
+ * <ol>
+ * @returns A brokerId.
+ */
+ private def getBrokerId: Int = {
+ var brokerId = config.brokerId
+ var logDirsWithoutMetaProps: List[String] = List()
+ val brokerIdSet = mutable.HashSet[Int]()
+
+ for (logDir <- config.logDirs) {
+ val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
+ brokerMetadataOpt match {
+ case Some(brokerMetadata: BrokerMetadata) =>
+ brokerIdSet.add(brokerMetadata.brokerId)
+ case None =>
+ logDirsWithoutMetaProps ++= List(logDir)
+ }
+ }
+
+ if(brokerIdSet.size > 1)
+ throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs")
+ else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
+ throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
+ else if(brokerIdSet.size == 0 && brokerId < 0) // generate a new brokerId from Zookeeper
+ brokerId = generateBrokerId
+ else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
+ brokerId = brokerIdSet.last
+
+ for(logDir <- logDirsWithoutMetaProps) {
+ val checkpoint = brokerMetadataCheckpoints(logDir)
+ checkpoint.write(new BrokerMetadata(brokerId))
+ }
+
+ return brokerId
+ }
+ private def generateBrokerId: Int = {
+ try {
+ ZkUtils.getBrokerSequenceId(zkClient, config.MaxReservedBrokerId)
+ } catch {
+ case e: Exception =>
+ error("Failed to generate broker.id due to ", e)
+ throw new GenerateBrokerIdException("Failed to generate broker.id", e)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 56e3e88..c14bd45 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -46,6 +46,7 @@ object ZkUtils extends Logging {
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
+ val BrokerSequenceIdPath = "/brokers/seqid"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
@@ -87,7 +88,8 @@ object ZkUtils extends Logging {
}
def setupCommonPaths(zkClient: ZkClient) {
- for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath))
+ for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
+ DeleteTopicsPath, BrokerSequenceIdPath))
makeSurePersistentPathExists(zkClient, path)
}
@@ -122,6 +124,14 @@ object ZkUtils extends Logging {
}
}
+ /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
+ * users can provide brokerId in the config , inorder to avoid conflicts between zk generated
+ * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
+ */
+ def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = {
+ getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId
+ }
+
/**
* Gets the in-sync replicas (ISR) for a specific topic and partition
*/
@@ -696,6 +706,32 @@ object ZkUtils extends Logging {
}
}
+ /**
+ * This API produces a sequence number by creating / updating given path in zookeeper
+ * It uses the stat returned by the zookeeper and return the version. Every time
+ * client updates the path stat.version gets incremented
+ */
+ def getSequenceId(client: ZkClient, path: String): Int = {
+ try {
+ val stat = client.writeDataReturnStat(path, "", -1)
+ return stat.getVersion
+ } catch {
+ case e: ZkNoNodeException => {
+ createParentPath(client, BrokerSequenceIdPath)
+ try {
+ client.createPersistent(BrokerSequenceIdPath, "")
+ return 0
+ } catch {
+ case e: ZkNodeExistsException =>
+ val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
+ return stat.getVersion
+ case e2: Throwable => throw e2
+ }
+ }
+ case e2: Throwable => throw e2
+ }
+ }
+
def getAllTopics(zkClient: ZkClient): Seq[String] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
if(topics == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
new file mode 100644
index 0000000..cf2dd94
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+import java.io.File
+
+class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
+ var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
+ var config1 = new KafkaConfig(props1)
+ var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
+ var config2 = new KafkaConfig(props2)
+ val brokerMetaPropsFile = "meta.properties"
+
+
+ @Test
+ def testAutoGenerateBrokerId() {
+ var server1 = new KafkaServer(config1)
+ server1.startup()
+ server1.shutdown()
+ assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+ // restart the server check to see if it uses the brokerId generated previously
+ server1 = new KafkaServer(config1)
+ server1.startup()
+ assertEquals(server1.config.brokerId, 1001)
+ server1.shutdown()
+ Utils.rm(server1.config.logDirs)
+ TestUtils.verifyNonDaemonThreadsStatus
+ }
+
+ @Test
+ def testUserConfigAndGeneratedBrokerId() {
+ // start the server with broker.id as part of config
+ val server1 = new KafkaServer(config1)
+ val server2 = new KafkaServer(config2)
+ val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
+ val config3 = new KafkaConfig(props3)
+ val server3 = new KafkaServer(config3)
+ server1.startup()
+ assertEquals(server1.config.brokerId,1001)
+ server2.startup()
+ assertEquals(server2.config.brokerId,0)
+ server3.startup()
+ assertEquals(server3.config.brokerId,1002)
+ server1.shutdown()
+ server2.shutdown()
+ server3.shutdown()
+ assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
+ assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
+ assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
+ Utils.rm(server3.config.logDirs)
+ TestUtils.verifyNonDaemonThreadsStatus
+ }
+
+ @Test
+ def testMultipleLogDirsMetaProps() {
+ // add multiple logDirs and check if the generate brokerId is stored in all of them
+ val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
+ "," + TestUtils.tempDir().getAbsolutePath
+ props1.setProperty("log.dir",logDirs)
+ config1 = new KafkaConfig(props1)
+ var server1 = new KafkaServer(config1)
+ server1.startup()
+ server1.shutdown()
+ assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+ // addition to log.dirs after generation of a broker.id from zk should be copied over
+ val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
+ props1.setProperty("log.dir",newLogDirs)
+ config1 = new KafkaConfig(props1)
+ server1 = new KafkaServer(config1)
+ server1.startup()
+ server1.shutdown()
+ assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+ Utils.rm(server1.config.logDirs)
+ TestUtils.verifyNonDaemonThreadsStatus
+ }
+
+ @Test
+ def testConsistentBrokerIdFromUserConfigAndMetaProps() {
+ // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
+ var server1 = new KafkaServer(config1) //auto generate broker Id
+ server1.startup()
+ server1.shutdown()
+ server1 = new KafkaServer(config2) // user specified broker id
+ try {
+ server1.startup()
+ } catch {
+ case e: kafka.common.InconsistentBrokerIdException => //success
+ }
+ server1.shutdown()
+ Utils.rm(server1.config.logDirs)
+ TestUtils.verifyNonDaemonThreadsStatus
+ }
+
+ def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
+ for(logDir <- logDirs) {
+ val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
+ new File(logDir + File.separator + brokerMetaPropsFile))).read()
+ brokerMetadataOpt match {
+ case Some(brokerMetadata: BrokerMetadata) =>
+ if (brokerMetadata.brokerId != brokerId) return false
+ case _ => return false
+ }
+ }
+ true
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c9e8ba2..ac15d34 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -94,7 +94,7 @@ object TestUtils extends Logging {
Utils.rm(f)
}
})
-
+
f
}
@@ -154,7 +154,7 @@ object TestUtils extends Logging {
def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
enableControlledShutdown: Boolean = true): Properties = {
val props = new Properties
- props.put("broker.id", nodeId.toString)
+ if (nodeId >= 0) props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
@@ -700,6 +700,11 @@ object TestUtils extends Logging {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}
+ def verifyNonDaemonThreadsStatus() {
+ assertEquals(0, Thread.getAllStackTraces.keySet().toArray
+ .map(_.asInstanceOf[Thread])
+ .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
+ }
/**
* Create new LogManager instance with default configuration for testing