You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC
svn commit: r1367811 [1/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Author: junrao
Date: Tue Jul 31 22:50:59 2012
New Revision: 1367811
URL: http://svn.apache.org/viewvc?rev=1367811&view=rev
Log:
revert commit to KAFKA-343 due to unit test failures
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
- copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
- copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
- copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
- copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
- copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Jul 31 22:50:59 2012
@@ -91,10 +91,10 @@ object AdminUtils extends Logging {
topics.map { topic =>
if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
- val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+ val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
val partitionMetadata = sortedPartitions.map { partitionMap =>
- val partition = partitionMap._1
+ val partition = partitionMap._1.toInt
val replicas = partitionMap._2
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Tue Jul 31 22:50:59 2012
@@ -25,54 +25,39 @@ import collection.mutable.HashMap
object LeaderAndISR {
- val initialLeaderEpoch: Int = 0
- val initialZKVersion: Int = 0
def readFrom(buffer: ByteBuffer): LeaderAndISR = {
val leader = buffer.getInt
val leaderGenId = buffer.getInt
val ISRString = Utils.readShortString(buffer, "UTF-8")
val ISR = ISRString.split(",").map(_.toInt).toList
- val zkVersion = buffer.getInt
+ val zkVersion = buffer.getLong
new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
}
}
-case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){
- def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion)
-
+case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
def writeTo(buffer: ByteBuffer) {
buffer.putInt(leader)
- buffer.putInt(leaderEpoch)
+ buffer.putInt(leaderEpoc)
Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
- buffer.putInt(zkVersion)
+ buffer.putLong(zkVersion)
}
def sizeInBytes(): Int = {
- val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4
+ val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
size
}
-
- override def toString(): String = {
- val jsonDataMap = new HashMap[String, String]
- jsonDataMap.put("leader", leader.toString)
- jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
- jsonDataMap.put("ISR", ISR.mkString(","))
- Utils.stringMapToJsonString(jsonDataMap)
- }
}
object LeaderAndISRRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
- val IsInit: Boolean = true
- val NotInit: Boolean = false
- val DefaultAckTimeout: Int = 1000
def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
- val isInit = if(buffer.get() == 1.toByte) true else false
+ val isInit = buffer.get()
val ackTimeoutMs = buffer.getInt
val leaderAndISRRequestCount = buffer.getInt
val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
@@ -91,18 +76,19 @@ object LeaderAndISRRequest {
case class LeaderAndISRRequest (versionId: Short,
clientId: String,
- isInit: Boolean,
+ isInit: Byte,
ackTimeoutMs: Int,
- leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
+ leaderAndISRInfos:
+ Map[(String, Int), LeaderAndISR])
extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
- def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
- this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos)
+ def this(isInit: Byte, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+ this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeoutMs, leaderAndISRInfos)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
- buffer.put(if(isInit) 1.toByte else 0.toByte)
+ buffer.put(isInit)
buffer.putInt(ackTimeoutMs)
buffer.putInt(leaderAndISRInfos.size)
for((key, value) <- leaderAndISRInfos){
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Tue Jul 31 22:50:59 2012
@@ -26,7 +26,6 @@ import collection.mutable.Set
object StopReplicaRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
- val DefaultAckTimeout = 100
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
@@ -44,10 +43,10 @@ object StopReplicaRequest {
case class StopReplicaRequest(versionId: Short,
clientId: String,
ackTimeoutMs: Int,
- stopReplicaSet: Set[(String, Int)])
- extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
- def this(stopReplicaSet: Set[(String, Int)]) = {
- this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
+ stopReplicaSet: Set[(String, Int)]
+ ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+ def this(ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) = {
+ this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeoutMs, stopReplicaSet)
}
def writeTo(buffer: ByteBuffer) {
@@ -68,4 +67,4 @@ case class StopReplicaRequest(versionId:
}
size
}
-}
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Tue Jul 31 22:50:59 2012
@@ -19,7 +19,7 @@ package kafka.cluster
import kafka.utils.Utils._
import java.nio.ByteBuffer
-import kafka.common.BrokerNotExistException
+import kafka.common.KafkaException
/**
* A Kafka broker
@@ -28,7 +28,7 @@ private[kafka] object Broker {
def createBroker(id: Int, brokerInfoString: String): Broker = {
if(brokerInfoString == null)
- throw new BrokerNotExistException("Broker id %s does not exist".format(id))
+ throw new KafkaException("Broker id %s does not exist".format(id))
val brokerInfo = brokerInfoString.split(":")
new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Jul 31 22:50:59 2012
@@ -16,12 +16,11 @@
*/
package kafka.cluster
+import kafka.utils.{SystemTime, Time, Logging}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils._
import java.util.concurrent.locks.ReentrantLock
-import scala.collection._
-import kafka.utils.{ZkUtils, SystemTime, Time}
import kafka.common.{KafkaException, LeaderNotAvailableException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.Logging
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -120,8 +119,14 @@ class Partition(val topic: String,
}
def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
- try{
+ try {
leaderISRUpdateLock.lock()
+ zkClientOpt match {
+ case Some(zkClient) =>
+ // update ISR in ZK
+ updateISRInZk(newISR, zkClient)
+ case None =>
+ }
// update partition's ISR in cache
inSyncReplicas = newISR.map {r =>
getReplica(r) match {
@@ -130,22 +135,29 @@ class Partition(val topic: String,
}
}
info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
- if(zkClientOpt.isDefined){
- val zkClient = zkClientOpt.get
- val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
- curLeaderAndISR match {
- case None =>
- throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
- case Some(m) =>
- m.ISR = newISR.toList
- ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
- }
- }
- } finally {
+ }catch {
+ case e => throw new KafkaException("Failed to update ISR for topic %s ".format(topic) +
+ "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
+ }finally {
leaderISRUpdateLock.unlock()
}
}
+ private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
+ val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
+ if(replicaListAndEpochString == null) {
+ throw new LeaderNotAvailableException(("Illegal partition state. ISR cannot be updated for topic " +
+ "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
+ }
+ else {
+ val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+ val epoch = replicasAndEpochInfo.last
+ updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
+ "%s;%s".format(newISR.mkString(","), epoch))
+ info("Updated ISR for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+ }
+ }
+
override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Partition]))
return false
@@ -168,4 +180,4 @@ class Partition(val topic: String,
partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
partitionString.toString()
}
-}
\ No newline at end of file
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Jul 31 22:50:59 2012
@@ -39,8 +39,7 @@ object ErrorMapping {
val NotLeaderForPartitionCode : Short = 7
val UnknownTopicCode : Short = 8
val RequestTimedOutCode: Short = 9
- val BrokerNotExistInZookeeperCode: Short = 10
- val ReplicaNotAvailableCode: Short = 11
+ val ReplicaNotAvailableCode: Short = 10
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -52,9 +51,8 @@ object ErrorMapping {
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
- classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode,
- classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
- classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
+ classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
+ classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -88,7 +88,8 @@ class ConsumerFetcherManager(private val
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, this)
+ new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+ config, sourceBroker, this)
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Tue Jul 31 22:50:59 2012
@@ -66,7 +66,7 @@ private[kafka] object TopicCount extends
consumerId: String,
zkClient: ZkClient) : TopicCount = {
val dirs = new ZKGroupDirs(group)
- val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
+ val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Tue Jul 31 22:50:59 2012
@@ -296,7 +296,7 @@ private[kafka] class ZookeeperConsumerCo
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partitionId
- val offsetString = readDataMaybeNull(zkClient, znode)._1
+ val offsetString = readDataMaybeNull(zkClient, znode)
if (offsetString != null)
return offsetString.toLong
else
@@ -416,7 +416,7 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
+ private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
val znode = topicDirs.consumerOwnerDir + "/" + partition
deletePath(zkClient, znode)
@@ -427,7 +427,7 @@ private[kafka] class ZookeeperConsumerCo
info("Releasing partition ownership")
for ((topic, infos) <- localTopicRegistry) {
for(partition <- infos.keys)
- deletePartitionOwnershipFromZK(topic, partition)
+ deletePartitionOwnershipFromZK(topic, partition.toString)
localTopicRegistry.remove(topic)
}
}
@@ -484,7 +484,7 @@ private[kafka] class ZookeeperConsumerCo
releasePartitionOwnership(topicRegistry)
- var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
+ var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
@@ -492,7 +492,7 @@ private[kafka] class ZookeeperConsumerCo
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
- var curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
+ var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get
val nPartsPerConsumer = curPartitions.size / curConsumers.size
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
@@ -602,13 +602,13 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = {
- var successfullyOwnedPartitions : List[(String, Int)] = Nil
+ private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+ var successfullyOwnedPartitions : List[(String, String)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
val partition = partitionOwner._1._2
val consumerThreadId = partitionOwner._2
- val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
+ val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic,partition)
try {
createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
@@ -633,29 +633,29 @@ private[kafka] class ZookeeperConsumerCo
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
- topicDirs: ZKGroupTopicDirs, partition: Int,
+ topicDirs: ZKGroupTopicDirs, partition: String,
topic: String, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
// find the leader for this partition
- val leaderOpt = getLeaderForPartition(zkClient, topic, partition)
+ val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
leaderOpt match {
- case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
+ case None => throw new NoBrokersForPartitionException("No leader available for partition %s on topic %s".
format(partition, topic))
- case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
+ case Some(l) => debug("Leader for partition %s for topic %s is %d".format(partition, topic, l))
}
val leader = leaderOpt.get
val znode = topicDirs.consumerOffsetDir + "/" + partition
- val offsetString = readDataMaybeNull(zkClient, znode)._1
+ val offsetString = readDataMaybeNull(zkClient, znode)
// If first time starting a consumer, set the initial offset based on the config
var offset : Long = 0L
if (offsetString == null)
offset = config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
- earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
+ earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.EarliestTime)
case OffsetRequest.LargestTimeString =>
- earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
+ earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.LatestTime)
case _ =>
throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
}
@@ -666,12 +666,12 @@ private[kafka] class ZookeeperConsumerCo
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
leader,
- partition,
+ partition.toInt,
queue,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchSize))
- partTopicInfoMap.put(partition, partTopicInfo)
+ partTopicInfoMap.put(partition.toInt, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -29,6 +29,7 @@ import kafka.common.{KafkaException, Inv
object Log {
val FileSuffix = ".kafka"
+ val hwFileName = "highwatermark"
/**
* Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -114,8 +115,8 @@ class LogSegment(val file: File, val mes
* An append-only log for storing messages.
*/
@threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean, brokerId: Int = 0) extends Logging {
- this.logIdent = "Kafka Log on Broker " + brokerId + ", "
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+ extends Logging {
import kafka.log.Log._
@@ -125,7 +126,7 @@ private[kafka] class Log( val dir: File,
/* The current number of unflushed messages appended to the write */
private val unflushed = new AtomicInteger(0)
- /* last time it was flushed */
+ /* last time it was flushed */
private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
/* The actual segments of the log */
@@ -190,7 +191,8 @@ private[kafka] class Log( val dir: File,
val curr = segments.get(i)
val next = segments.get(i+1)
if(curr.start + curr.size != next.start)
- throw new KafkaException("The following segments don't validate: " + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+ throw new KafkaException("The following segments don't validate: " +
+ curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
}
}
}
@@ -229,12 +231,13 @@ private[kafka] class Log( val dir: File,
BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
-
+
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
val validByteBuffer = messages.getBuffer.duplicate()
val messageSetValidBytes = messages.validBytes
if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
- throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+ throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")
validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
val validMessages = new ByteBufferMessageSet(validByteBuffer)
@@ -344,11 +347,12 @@ private[kafka] class Log( val dir: File,
if (unflushed.get == 0) return
lock synchronized {
- debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + System.currentTimeMillis)
+ debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+ System.currentTimeMillis)
segments.view.last.messageSet.flush()
unflushed.set(0)
lastflushedTime.set(System.currentTimeMillis)
- }
+ }
}
def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
@@ -371,15 +375,15 @@ private[kafka] class Log( val dir: File,
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
- var isFound = false
- debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
- startIndex = offsetTimeArray.length - 1
- while (startIndex >= 0 && !isFound) {
- if (offsetTimeArray(startIndex)._2 <= request.time)
- isFound = true
- else
- startIndex -=1
- }
+ var isFound = false
+ debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+ startIndex = offsetTimeArray.length - 1
+ while (startIndex >= 0 && !isFound) {
+ if (offsetTimeArray(startIndex)._2 <= request.time)
+ isFound = true
+ else
+ startIndex -=1
+ }
}
val retSize = request.maxNumOffsets.min(startIndex + 1)
@@ -404,13 +408,7 @@ private[kafka] class Log( val dir: File,
}
}
-
- def deleteWholeLog():Unit = {
- deleteSegments(segments.contents.get())
- Utils.rm(dir)
- }
-
- /* Attempts to delete all provided segments from a log and returns how many it was able to */
+ /* Attemps to delete all provided segments from a log and returns how many it was able to */
def deleteSegments(segments: Seq[LogSegment]): Int = {
var total = 0
for(segment <- segments) {
@@ -426,27 +424,30 @@ private[kafka] class Log( val dir: File,
}
def truncateTo(targetOffset: Long) {
- // find the log segment that has this hw
- val segmentToBeTruncated = segments.view.find(
- segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
-
- segmentToBeTruncated match {
- case Some(segment) =>
- val truncatedSegmentIndex = segments.view.indexOf(segment)
- segments.truncLast(truncatedSegmentIndex)
- segment.truncateTo(targetOffset)
- info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
- case None =>
- assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
- error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
- }
+ // find the log segment that has this hw
+ val segmentToBeTruncated = segments.view.find(segment =>
+ targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
+
+ segmentToBeTruncated match {
+ case Some(segment) =>
+ val truncatedSegmentIndex = segments.view.indexOf(segment)
+ segments.truncLast(truncatedSegmentIndex)
+ segment.truncateTo(targetOffset)
+ info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
+ case None =>
+ assert(targetOffset <= segments.view.last.absoluteEndOffset,
+ "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
+ format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+ error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+ .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
+ }
- val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
- if(segmentsToBeDeleted.size < segments.view.size) {
+ val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+ if(segmentsToBeDeleted.size < segments.view.size) {
val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
if(numSegmentsDeleted != segmentsToBeDeleted.size)
error("Failed to delete some segments during log recovery")
- }
+ }
}
def topicName():String = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jul 31 22:50:59 2012
@@ -1,11 +1,11 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -35,7 +35,7 @@ private[kafka] class LogManager(val conf
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
needRecovery: Boolean) extends Logging {
-
+
val logDir: File = new File(config.logDir)
private val numPartitions = config.numPartitions
private val maxSize: Long = config.logFileSize
@@ -44,7 +44,6 @@ private[kafka] class LogManager(val conf
private val logFlushIntervals = config.flushIntervalMap
private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRetentionSize = config.logRetentionSize
- this.logIdent = "Log Manager on Broker " + config.brokerId + ", "
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@@ -61,11 +60,11 @@ private[kafka] class LogManager(val conf
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
- val log = new Log(dir, maxSize, flushInterval, needRecovery, config.brokerId)
- val topicPartition = Utils.getTopicPartition(dir.getName)
- logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
- val parts = logs.get(topicPartition._1)
- parts.put(topicPartition._2, log)
+ val log = new Log(dir, maxSize, flushInterval, needRecovery)
+ val topicPartion = Utils.getTopicPartition(dir.getName)
+ logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
+ val parts = logs.get(topicPartion._1)
+ parts.put(topicPartion._2, log)
}
}
}
@@ -79,9 +78,9 @@ private[kafka] class LogManager(val conf
info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
info("Starting log flusher every " + config.flushSchedulerThreadRate +
- " ms with the following overrides " + logFlushIntervals)
+ " ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
- config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+ config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
}
}
@@ -94,14 +93,14 @@ private[kafka] class LogManager(val conf
throw new InvalidTopicException("Topic name can't be emtpy")
if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
val error = "Wrong partition %d, valid partitions (0, %d)."
- .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+ .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
warn(error)
throw new InvalidPartitionException(error)
}
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
- new Log(d, maxSize, flushInterval, false, config.brokerId)
+ new Log(d, maxSize, flushInterval, false)
}
}
@@ -196,19 +195,18 @@ private[kafka] class LogManager(val conf
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
- debug("Log cleanup completed. " + total + " files deleted in " +
- (time.milliseconds - startMs) / 1000 + " seconds")
+ debug("Log cleanup completed. " + total + " files deleted in " +
+ (time.milliseconds - startMs) / 1000 + " seconds")
}
-
+
/**
* Close all the logs
*/
def shutdown() {
- info("shut down")
+ info("Closing log manager")
allLogs.foreach(_.close())
- info("shutted down completedly")
}
-
+
/**
* Get all the partition logs
*/
@@ -224,7 +222,7 @@ private[kafka] class LogManager(val conf
if(logFlushIntervals.contains(log.topicName))
logFlushInterval = logFlushIntervals(log.topicName)
debug(log.topicName + " flush interval " + logFlushInterval +
- " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+ " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= logFlushInterval)
log.flush
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Jul 31 22:50:59 2012
@@ -31,13 +31,12 @@ import kafka.utils._
* N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
-class SocketServer(val brokerId: Int,
- val port: Int,
+class SocketServer(val port: Int,
val numProcessorThreads: Int,
val monitoringPeriodSecs: Int,
val maxQueuedRequests: Int,
val maxRequestSize: Int = Int.MaxValue) extends Logging {
- this.logIdent = "Socket Server on Broker " + brokerId + ", "
+
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
private var acceptor: Acceptor = new Acceptor(port, processors)
@@ -58,18 +57,18 @@ class SocketServer(val brokerId: Int,
// start accepting connections
Utils.newThread("kafka-acceptor", acceptor, false).start()
acceptor.awaitStartup
- info("started")
+ info("Kafka socket server started")
}
/**
* Shutdown the socket server
*/
def shutdown() = {
- info("shutting down")
+ info("Shutting down socket server")
acceptor.shutdown
for(processor <- processors)
processor.shutdown
- info("shutted down completely")
+ info("Shut down socket server complete")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
// map of (source brokerid, fetcher Id per source broker) => fetcher
- private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
+ private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = name + " "
@@ -52,7 +52,7 @@ abstract class AbstractFetcherManager(pr
}
def removeFetcher(topic: String, partitionId: Int) {
- info("removing fetcher on topic %s, partition %d".format(topic, partitionId))
+ info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
mapLock synchronized {
for ((key, fetcher) <- fetcherThreadMap) {
fetcher.removePartition(topic, partitionId)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jul 31 22:50:59 2012
@@ -25,29 +25,24 @@ import kafka.common._
import kafka.log._
import kafka.message._
import kafka.network._
+import kafka.utils.{SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
import kafka.network.RequestChannel.Response
-import kafka.utils.{ZkUtils, SystemTime, Logging}
-import kafka.cluster.Replica
/**
* Logic to handle the various Kafka requests
*/
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
- val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper,
- addReplicaCbk: (String, Int, Set[Int]) => Replica,
- stopReplicaCbk: (String, Int) => Short,
- becomeLeader: (Replica, LeaderAndISR) => Short,
- becomeFollower: (Replica, LeaderAndISR) => Short,
- brokerId: Int) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel,
+ val logManager: LogManager,
+ val replicaManager: ReplicaManager,
+ val kafkaZookeeper: KafkaZooKeeper) extends Logging {
- private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId)
- private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+ private val produceRequestPurgatory = new ProducerRequestPurgatory
+ private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val requestLogger = Logger.getLogger("kafka.request.logger")
- this.logIdent = "KafkaApi on Broker " + brokerId + ", "
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -67,46 +62,12 @@ class KafkaApis(val requestChannel: Requ
def handleLeaderAndISRRequest(request: RequestChannel.Request){
- val responseMap = new HashMap[(String, Int), Short]
val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
- info("handling leader and isr request " + leaderAndISRRequest)
-
- for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
- var errorCode = ErrorMapping.NoError
- val topic = partitionInfo._1
- val partition = partitionInfo._2
-
- // If the partition does not exist locally, create it
- if(replicaManager.getPartition(topic, partition) == None){
- trace("the partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
- val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition)
- trace("assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
- if(assignedReplicas.contains(brokerId)) {
- val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
- info("starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
- }
- }
- val replica = replicaManager.getReplica(topic, partition).get
- // The command ask this broker to be new leader for P and it isn't the leader yet
- val requestedLeaderId = leaderAndISR.leader
- // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id)
- if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){
- info("becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
- errorCode = becomeLeader(replica, leaderAndISR)
- }
- else if (requestedLeaderId != brokerId) {
- info("becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
- errorCode = becomeFollower(replica, leaderAndISR)
- }
-
- responseMap.put(partitionInfo, errorCode)
- }
+ val responseMap = new HashMap[(String, Int), Short]
- if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
- replicaManager.startHighWaterMarksCheckPointThread
- val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet
- info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
- partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2))
+ // TODO: put in actual logic later
+ for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
+ responseMap.put(key, ErrorMapping.NoError)
}
val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
@@ -118,9 +79,9 @@ class KafkaApis(val requestChannel: Requ
val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
val responseMap = new HashMap[(String, Int), Short]
+ // TODO: put in actual logic later
for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
- val errorCode = stopReplicaCbk(topic, partition)
- responseMap.put((topic, partition), errorCode)
+ responseMap.put((topic, partition), ErrorMapping.NoError)
}
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
@@ -134,7 +95,7 @@ class KafkaApis(val requestChannel: Requ
var satisfied = new mutable.ArrayBuffer[DelayedFetch]
for(partitionData <- partitionDatas)
satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
- trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+ trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
// send any newly unblocked responses
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
@@ -150,11 +111,11 @@ class KafkaApis(val requestChannel: Requ
val produceRequest = ProducerRequest.readFrom(request.request.buffer)
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
- requestLogger.trace("producer request %s".format(produceRequest.toString))
+ requestLogger.trace("Producer request " + request.toString)
trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
val response = produceToLocalLog(produceRequest)
- debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+ debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
if (produceRequest.requiredAcks == 0 ||
produceRequest.requiredAcks == 1 ||
@@ -172,11 +133,13 @@ class KafkaApis(val requestChannel: Requ
(topic, partitionData.partition)
})
})
+
val delayedProduce = new DelayedProduce(
topicPartitionPairs, request,
response.errors, response.offsets,
produceRequest, produceRequest.ackTimeoutMs.toLong)
produceRequestPurgatory.watch(delayedProduce)
+
/*
* Replica fetch requests may have arrived (and potentially satisfied)
* delayedProduce requests before they even made it to the purgatory.
@@ -184,9 +147,10 @@ class KafkaApis(val requestChannel: Requ
*/
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
topicPartitionPairs.foreach(topicPartition =>
- satisfiedProduceRequests ++=
- produceRequestPurgatory.update(topicPartition, topicPartition))
- debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size))
+ satisfiedProduceRequests ++=
+ produceRequestPurgatory.update(topicPartition, topicPartition))
+ debug(satisfiedProduceRequests.size +
+ " DelayedProduce requests unblocked after produce to local log.")
satisfiedProduceRequests.foreach(_.respond())
}
}
@@ -195,10 +159,10 @@ class KafkaApis(val requestChannel: Requ
* Helper method for handling a parsed producer request
*/
private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
- trace("produce [%s] to local log ".format(request.toString))
val requestSize = request.topicPartitionCount
val errors = new Array[Short](requestSize)
val offsets = new Array[Long](requestSize)
+
var msgIndex = -1
for(topicData <- request.data) {
for(partitionData <- topicData.partitionDataArray) {
@@ -217,7 +181,7 @@ class KafkaApis(val requestChannel: Requ
case e =>
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
- error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
+ error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
@@ -229,8 +193,7 @@ class KafkaApis(val requestChannel: Requ
}
}
}
- val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
- ret
+ new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
}
/**
@@ -238,7 +201,9 @@ class KafkaApis(val requestChannel: Requ
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = FetchRequest.readFrom(request.request.buffer)
- trace("handling fetch request: " + fetchRequest.toString)
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Fetch request " + fetchRequest.toString)
+ trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
// validate the request
try {
fetchRequest.validate()
@@ -260,7 +225,8 @@ class KafkaApis(val requestChannel: Requ
)
})
})
- debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+ trace("Replica %d fetch unblocked %d DelayedProduce requests.".format(
+ fetchRequest.replicaId, satisfiedProduceRequests.size))
satisfiedProduceRequests.foreach(_.respond())
}
@@ -270,11 +236,11 @@ class KafkaApis(val requestChannel: Requ
availableBytes >= fetchRequest.minBytes ||
fetchRequest.numPartitions <= 0) {
val topicData = readMessageSets(fetchRequest)
- debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+ debug("Returning fetch response %s for fetch request with correlation id %d"
+ .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
- debug("putting fetch request into purgatory")
// create a list of (topic, partition) pairs to use as keys for this delayed request
val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
@@ -290,6 +256,7 @@ class KafkaApis(val requestChannel: Requ
for(offsetDetail <- fetchRequest.offsetInfo) {
for(i <- 0 until offsetDetail.partitions.size) {
try {
+ debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
val available = maybeLog match {
case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
@@ -298,7 +265,7 @@ class KafkaApis(val requestChannel: Requ
totalBytes += math.min(offsetDetail.fetchSizes(i), available)
} catch {
case e: InvalidPartitionException =>
- info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+ info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
}
}
}
@@ -307,13 +274,13 @@ class KafkaApis(val requestChannel: Requ
private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
val offsets = fetchRequest.offsetInfo
- debug("act on update partition HW, check offset detail: %s ".format(offsets))
+
for(offsetDetail <- offsets) {
val topic = offsetDetail.topic
val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
- kafkaZookeeper.getZookeeperClient)
+ kafkaZookeeper.getZookeeperClient)
}
}
}
@@ -343,17 +310,21 @@ class KafkaApis(val requestChannel: Requ
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
- assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
+ assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
+ " must exist on leader broker %d".format(logManager.config.brokerId))
val leaderReplica = leaderReplicaOpt.get
fetchRequest.replicaId match {
case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
case _ => // fetch request from a follower
val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
- assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
+ assert(replicaOpt.isDefined, "No replica %d in replica manager on %d"
+ .format(fetchRequest.replicaId, replicaManager.config.brokerId))
val replica = replicaOpt.get
- debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
- debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+ debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+ .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+ debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+ .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
}
}
@@ -372,7 +343,7 @@ class KafkaApis(val requestChannel: Requ
try {
// check if the current broker is the leader for the partitions
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
- trace("fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+ trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val log = logManager.getLog(topic, partition)
response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
} catch {
@@ -389,7 +360,7 @@ class KafkaApis(val requestChannel: Requ
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
- requestLogger.trace("offset request " + offsetRequest.toString)
+ requestLogger.trace("Offset request " + offsetRequest.toString)
var response: OffsetResponse = null
try {
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
@@ -401,7 +372,8 @@ class KafkaApis(val requestChannel: Requ
System.exit(1)
case e =>
warn("Error while responding to offset request", e)
- response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+ response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
+ ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
}
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
@@ -412,39 +384,41 @@ class KafkaApis(val requestChannel: Requ
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
- requestLogger.trace("topic metadata request " + metadataRequest.toString())
+ requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val zkClient = kafkaZookeeper.getZookeeperClient
var errorCode = ErrorMapping.NoError
val config = logManager.config
+
try {
val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
- metadataRequest.topics.zip(topicMetadataList).foreach(
- topicAndMetadata =>{
- val topic = topicAndMetadata._1
- topicAndMetadata._2.errorCode match {
- case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
- case ErrorMapping.UnknownTopicCode =>
- /* check if auto creation of topics is turned on */
- if(config.autoCreateTopics) {
- CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
- info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
- .format(topic, config.numPartitions, config.defaultReplicationFactor))
- val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
- newTopicMetadata.errorCode match {
- case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
- case _ =>
- throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
- }
+
+ metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+ val topic = topicAndMetadata._1
+ topicAndMetadata._2.errorCode match {
+ case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+ case ErrorMapping.UnknownTopicCode =>
+ /* check if auto creation of topics is turned on */
+ if(config.autoCreateTopics) {
+ CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+ .format(topic, config.numPartitions, config.defaultReplicationFactor))
+ val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+ newTopicMetadata.errorCode match {
+ case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+ case _ =>
+ throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
}
- case _ => error("Error while fetching topic metadata for topic " + topic,
- ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
- }
- })
+ }
+ case _ => error("Error while fetching topic metadata for topic " + topic,
+ ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+ }
+ }
}catch {
case e => error("Error while retrieving topic metadata", e)
- // convert exception type to error code
- errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ // convert exception type to error code
+ errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
}
topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
@@ -452,10 +426,7 @@ class KafkaApis(val requestChannel: Requ
}
def close() {
- debug("shut down")
fetchRequestPurgatory.shutdown()
- produceRequestPurgatory.shutdown()
- debug("shutted down completely")
}
/**
@@ -468,7 +439,7 @@ class KafkaApis(val requestChannel: Requ
/**
* A holding pen for fetch requests waiting to be satisfied
*/
- class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") {
+ class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -476,7 +447,6 @@ class KafkaApis(val requestChannel: Requ
def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
val messageDataSize = partitionData.messages.sizeInBytes
val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
- debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes)
accumulatedSize >= delayedFetch.fetch.minBytes
}
@@ -496,7 +466,7 @@ class KafkaApis(val requestChannel: Requ
requiredOffsets: Array[Long],
val produce: ProducerRequest,
delayMs: Long)
- extends DelayedRequest(keys, request, delayMs) with Logging {
+ extends DelayedRequest(keys, request, delayMs) with Logging {
/**
* Map of (topic, partition) -> partition status
@@ -525,15 +495,15 @@ class KafkaApis(val requestChannel: Requ
def respond() {
val errorsAndOffsets: (List[Short], List[Long]) = (
- keys.foldRight
- ((List[Short](), List[Long]()))
- ((key: Any, result: (List[Short], List[Long])) => {
- val status = partitionStatus(key)
- (status.error :: result._1, status.requiredOffset :: result._2)
- })
- )
+ keys.foldRight
+ ((List[Short](), List[Long]()))
+ ((key: Any, result: (List[Short], List[Long])) => {
+ val status = partitionStatus(key)
+ (status.error :: result._1, status.requiredOffset :: result._2)
+ })
+ )
val response = new ProducerResponse(produce.versionId, produce.correlationId,
- errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+ errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
requestChannel.sendResponse(new RequestChannel.Response(
request, new BoundedByteBufferSend(response)))
@@ -569,7 +539,7 @@ class KafkaApis(val requestChannel: Requ
numAcks, produce.requiredAcks,
topic, partitionId))
if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
- (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+ (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
/*
* requiredAcks < 0 means acknowledge after all replicas in ISR
* are fully caught up to the (local) leader's offset
@@ -618,7 +588,8 @@ class KafkaApis(val requestChannel: Requ
/**
* A holding pen for produce requests waiting to be satisfied.
*/
- private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") {
+ private [kafka] class ProducerRequestPurgatory
+ extends RequestPurgatory[DelayedProduce, (String, Int)] {
protected def checkSatisfied(fetchRequestPartition: (String, Int),
delayedProduce: DelayedProduce) =