You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/06/01 03:53:21 UTC
svn commit: r1344964 [2/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/
core/src/main/scala/kafka/common/ core/src/main/scala/kafka/log/ core/src/...
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Fri Jun 1 01:53:19 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+import kafka.api.FetchRequestBuilder
+import kafka.utils.Logging
+import kafka.cluster.{Broker, Replica}
+import kafka.consumer.SimpleConsumer
+
+class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
+ extends Thread(name) with Logging {
+ val isRunning: AtomicBoolean = new AtomicBoolean(true)
+ private val shutdownLatch = new CountDownLatch(1)
+ private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
+ config.replicaSocketTimeoutMs, config.replicaSocketBufferSize)
+
+ override def run() {
+ try {
+ info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
+ while(isRunning.get()) {
+ val builder = new FetchRequestBuilder().
+ clientId(name).
+ replicaId(replica.brokerId).
+ maxWait(config.replicaMaxWaitTimeMs).
+ minBytes(config.replicaMinBytes)
+
+ // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message
+ // replication actually works
+ val fetchOffset = replica.logEndOffset()
+ trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
+ .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
+ builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize)
+
+ val fetchRequest = builder.build()
+ val response = replicaConsumer.fetch(fetchRequest)
+ // TODO: KAFKA-339 Check for error. Don't blindly read the messages
+ // append messages to local log
+ replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
+ // record the hw sent by the leader for this partition
+ val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
+ replica.highWatermark(Some(followerHighWatermark))
+ trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+ .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
+ }
+ }catch {
+ case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
+ case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
+ }
+ shutdownComplete()
+ }
+
+ private def shutdownComplete() = {
+ replicaConsumer.close()
+ shutdownLatch.countDown
+ }
+
+ def getLeader(): Broker = leaderBroker
+
+ def shutdown() {
+ info("Shutting down replica fetcher thread")
+ isRunning.set(false)
+ interrupt()
+ shutdownLatch.await()
+ info("Replica fetcher thread shutdown completed")
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Jun 1 01:53:19 2012
@@ -18,47 +18,232 @@ package kafka.server
import kafka.log.Log
import kafka.cluster.{Partition, Replica}
-import kafka.utils.Logging
import collection.mutable
+import java.lang.IllegalStateException
+import mutable.ListBuffer
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
+import kafka.common.InvalidPartitionException
+
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
+
+ private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
+ private var leaderReplicas = new ListBuffer[Partition]()
+ private val leaderReplicaLock = new ReentrantLock()
+ private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+ // start ISR expiration thread
+ isrExpirationScheduler.startUp
+ isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+
+ def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
+ val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
+ val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+
+ val replicaOpt = partition.getReplica(config.brokerId)
+ replicaOpt match {
+ case Some(replica) =>
+ info("Changing remote replica %s into a local replica".format(replica.toString))
+ replica.log match {
+ case None =>
+ replica.log = Some(log)
+ case Some(log) => // nothing to do since log already exists
+ }
+ case None =>
+ partition.addReplica(localReplica)
+ }
+ val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
+ partition.assignedReplicas(Some(assignedReplicas))
+ // get the replica objects for the assigned replicas for this partition
+ info("Added local replica %d for topic %s partition %s on broker %d"
+ .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+ localReplica
+ }
-class ReplicaManager(config: KafkaConfig) extends Logging {
+ def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
+ val newPartition = allReplicas.contains((topic, partitionId))
+ newPartition match {
+ case true => // partition exists, do nothing
+ allReplicas.get((topic, partitionId)).get
+ case false => // create remote replicas for each replica id in assignedReplicas
+ val partition = new Partition(topic, partitionId, time)
+ allReplicas += (topic, partitionId) -> partition
+ (assignedReplicaIds - config.brokerId).foreach(
+ replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
+ partition
+ }
+ }
- private val replicas = new mutable.HashMap[(String, Int), Replica]()
+ def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
+ val partitionOpt = allReplicas.get((topic, partitionId))
+ partitionOpt match {
+ case Some(partition) => partition
+ case None =>
+ throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+ .format(topic, partitionId, config.brokerId))
+ }
+ }
- def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
- val replica = replicas.get((topic, partitionId))
- replica match {
- case Some(r) =>
- r.log match {
- case None =>
- r.log = Some(log)
- case Some(l) => // nothing to do since log already exists
+ def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
+ val remoteReplica = new Replica(replicaId, partition, topic)
+
+ val replicaAdded = partition.addReplica(remoteReplica)
+ if(replicaAdded)
+ info("Added remote replica %d for topic %s partition %s on broker %d"
+ .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+ remoteReplica
+ }
+
+ def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
+ val replicasOpt = allReplicas.get((topic, partitionId))
+ replicasOpt match {
+ case Some(replicas) =>
+ replicas.getReplica(replicaId)
+ case None =>
+ None
+ }
+ }
+
+ def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
+ val replicasOpt = allReplicas.get((topic, partitionId))
+ replicasOpt match {
+ case Some(replicas) =>
+ Some(replicas.leaderReplica())
+ case None =>
+ throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+ "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+ }
+ }
+
+ def getPartition(topic: String, partitionId: Int): Option[Partition] =
+ allReplicas.get((topic, partitionId))
+
+ def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+ // set the replica leo
+ val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+ partition.updateReplicaLEO(replica, fetchOffset)
+ }
+
+ def maybeIncrementLeaderHW(replica: Replica) {
+ // set the replica leo
+ val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+ // set the leader HW to min of the leo of all replicas
+ val allLeos = partition.inSyncReplicas.map(_.logEndOffset())
+ val newHw = allLeos.min
+ val oldHw = partition.leaderHW()
+ if(newHw > oldHw) {
+ debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
+ partition.leaderHW(Some(newHw))
+ }
+ }
+
+ def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+ // stop replica fetcher thread, if any
+ replica.stopReplicaFetcherThread()
+ // read and cache the ISR
+ replica.partition.leaderId(Some(replica.brokerId))
+ replica.partition.updateISR(currentISRInZk.toSet)
+ // also add this partition to the list of partitions for which the leader is the current broker
+ try {
+ leaderReplicaLock.lock()
+ leaderReplicas += replica.partition
+ }finally {
+ leaderReplicaLock.unlock()
+ }
+ }
+
+ def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+ info("Broker %d becoming follower to leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ // remove this replica's partition from the ISR expiration queue
+ try {
+ leaderReplicaLock.lock()
+ leaderReplicas -= replica.partition
+ }finally {
+ leaderReplicaLock.unlock()
+ }
+ replica.log match {
+ case Some(log) => // log is already started
+ log.recoverUptoLastCheckpointedHW()
+ case None =>
+ }
+ // get leader for this replica
+ val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
+ val isReplicaAFollower = replica.getIfFollowerAndLeader()
+ // Become follower only if it is not already following the same leader
+ if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
+ // stop fetcher thread to previous leader
+ replica.stopReplicaFetcherThread()
+ // start fetcher thread to current leader
+ replica.startReplicaFetcherThread(leaderBroker, config)
+ }
+ }
+
+ def maybeShrinkISR(): Unit = {
+ try {
+ info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+ .format(config.keepInSyncTimeMs))
+
+ leaderReplicaLock.lock()
+ leaderReplicas.foreach { partition =>
+ // shrink ISR if a follower is slow or stuck
+ val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes)
+ if(outOfSyncReplicas.size > 0) {
+ val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
+ assert(newInSyncReplicas.size > 0)
+ info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
+ newInSyncReplicas.map(_.brokerId).mkString(",")))
+ // update ISR in zk and in memory
+ partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
+ }
+ }
+ }catch {
+ case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
+ }finally {
+ leaderReplicaLock.unlock()
+ }
+ }
+
+ def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+ val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+ if(partition.inSyncReplicas.contains(replica)) false
+ else if(partition.assignedReplicas().contains(replica)) {
+ val leaderHW = partition.leaderHW()
+ replica.logEndOffset() >= leaderHW
+ }
+ else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+ " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+ }
+
+ def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
+ val replicaOpt = getReplica(topic, partition, replicaId)
+ replicaOpt match {
+ case Some(replica) =>
+ updateReplicaLEO(replica, offset)
+ // check if this replica needs to be added to the ISR
+ if(checkIfISRCanBeExpanded(replica)) {
+ val newISR = replica.partition.inSyncReplicas + replica
+ // update ISR in ZK and cache
+ replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
}
+ maybeIncrementLeaderHW(replica)
case None =>
- val partition = new Partition(topic, partitionId)
- val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
- replicas.put((topic, partitionId), replica)
- info("Added local replica for topic %s partition %s on broker %d"
- .format(replica.topic, replica.partition.partId, replica.brokerId))
+ throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
}
- replicas.get((topic, partitionId)).get
}
- def addRemoteReplica(topic: String, partitionId: Int): Replica = {
- val replica = replicas.get((topic, partitionId))
- replica match {
- case Some(r) =>
+ def recordLeaderLogUpdate(topic: String, partition: Int) = {
+ val replicaOpt = getReplica(topic, partition, config.brokerId)
+ replicaOpt match {
+ case Some(replica) =>
+ replica.logEndOffsetUpdateTime(Some(time.milliseconds))
case None =>
- val partition = new Partition(topic, partitionId)
- val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
- replicas.put((topic, partitionId), replica)
- info("Added remote replica for topic %s partition %s on broker %d"
- .format(replica.topic, replica.partition.partId, replica.brokerId))
+ throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
}
- replicas.get((topic, partitionId)).get
}
- def getReplica(topic: String, partitionId: Int): Option[Replica] = {
- replicas.get((topic, partitionId))
+ def close() {
+ isrExpirationScheduler.shutdown()
+ allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
}
-}
\ No newline at end of file
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Fri Jun 1 01:53:19 2012
@@ -21,7 +21,6 @@ import scala.collection._
import java.util.LinkedList
import java.util.concurrent._
import java.util.concurrent.atomic._
-import kafka.api._
import kafka.network._
import kafka.utils._
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Fri Jun 1 01:53:19 2012
@@ -39,24 +39,24 @@ class KafkaScheduler(val numThreads: Int
def hasShutdown: Boolean = executor.isShutdown
- private def checkIfExecutorHasStarted = {
+ private def ensureExecutorHasStarted = {
if(executor == null)
throw new IllegalStateException("Kafka scheduler has not been started")
}
def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
- checkIfExecutorHasStarted
+ ensureExecutorHasStarted
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
}
def shutdownNow() {
- checkIfExecutorHasStarted
+ ensureExecutorHasStarted
executor.shutdownNow()
info("Forcing shutdown of scheduler " + baseThreadName)
}
def shutdown() {
- checkIfExecutorHasStarted
+ ensureExecutorHasStarted
executor.shutdown()
info("Shutdown scheduler " + baseThreadName)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Fri Jun 1 01:53:19 2012
@@ -133,20 +133,21 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
+ def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
try {
// NOTE: first increment epoch, then become leader
val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
"%d;%d".format(brokerId, newEpoch))
val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+ val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
- "%s;%d".format(currentISR.mkString(","), newEpoch))
+ "%s;%d".format(updatedISR.mkString(","), newEpoch))
info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
- Some(newEpoch)
+ Some(newEpoch, updatedISR)
} catch {
case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
- case oe => None
+ case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
}
}
@@ -161,7 +162,7 @@ object ZkUtils extends Logging {
val newEpoch = epoch match {
case Some(partitionEpoch) =>
- debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+ debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
partitionEpoch + 1
case None =>
// this is the first time leader is elected for this partition. So set epoch to 1
Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Fri Jun 1 01:53:19 2012
@@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.C
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
# zkclient can be verbose, during debugging it is common to adjust is separately
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Fri Jun 1 01:53:19 2012
@@ -43,7 +43,7 @@ class TopicCountTest extends JUnitSuite
*/
@Test
def testPartition() {
- assertTrue(new Partition("foo", 10) == new Partition("foo", 10))
- assertTrue(new Partition("foo", 1) != new Partition("foo", 0))
+ assertTrue(new Partition("foo", 10).equals(new Partition("foo", 10)))
+ assertTrue(!new Partition("foo", 1).equals(new Partition("foo", 0)))
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Fri Jun 1 01:53:19 2012
@@ -18,8 +18,6 @@
package kafka.integration
import junit.framework.Assert._
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.atomic.AtomicInteger
import kafka.utils.{ZKGroupTopicDirs, Logging}
import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
import kafka.server._
@@ -61,10 +59,10 @@ class AutoOffsetResetTest extends JUnit3
def testResetToEarliestWhenOffsetTooLow() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
- def testResetToLatestWhenOffsetTooHigh() =
+ def testResetToLatestWhenOffsetTooHigh() =
assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
-
- def testResetToLatestWhenOffsetTooLow() =
+
+ def testResetToLatestWhenOffsetTooLow() =
assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
/* Produce the given number of messages, create a consumer with the given offset policy,
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Fri Jun 1 01:53:19 2012
@@ -25,6 +25,7 @@ import kafka.consumer.SimpleConsumer
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -55,6 +56,8 @@ class BackwardsCompatibilityTest extends
// test for reading data with magic byte 0
def testProtocolVersion0() {
+ CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
var fetchOffset: Long = 0L
var messageCount: Int = 0
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Fri Jun 1 01:53:19 2012
@@ -30,10 +30,11 @@ import kafka.integration.KafkaServerTest
import kafka.producer.{ProducerData, Producer}
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
+import kafka.admin.CreateTopicCommand
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
- val numNodes = 2
+ val numNodes = 1
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props)
@@ -54,6 +55,7 @@ class FetcherTest extends JUnit3Suite wi
override def setUp() {
super.setUp
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
fetcher.stopConnectionsToAllBrokers
fetcher.startConnections(topicInfos, cluster)
@@ -69,11 +71,9 @@ class FetcherTest extends JUnit3Suite wi
var count = sendMessages(perNode)
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
fetch(count)
- Thread.sleep(100)
assertQueueEmpty()
count = sendMessages(perNode)
fetch(count)
- Thread.sleep(100)
assertQueueEmpty()
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jun 1 01:53:19 2012
@@ -17,23 +17,22 @@
package kafka.integration
-import java.io.File
import java.nio.ByteBuffer
-import java.util.Properties
import junit.framework.Assert._
-import kafka.admin.CreateTopicCommand
import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
-import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.message.Message
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig}
import kafka.serializer.StringDecoder
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import kafka.message.Message
import kafka.utils.{TestZKUtils, TestUtils}
import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
+import kafka.admin.CreateTopicCommand
+import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
@@ -93,7 +92,7 @@ class PrimitiveApiTest extends JUnit3Sui
case e: FetchRequestFormatException => "success"
}
}
-
+
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
val props = new Properties()
@@ -158,7 +157,7 @@ class PrimitiveApiTest extends JUnit3Sui
messages += topic -> messageList
producer.send(producerData)
builder.addFetch(topic, partition, 0, 10000)
- }
+ }
// wait a bit for produced message to be available
Thread.sleep(700)
@@ -199,8 +198,9 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics)
+ for( (topic, partition) <- topics) {
response.messageSet(topic, -1).iterator
+ }
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: InvalidPartitionException => "this is good"
@@ -332,10 +332,10 @@ class PrimitiveApiTest extends JUnit3Sui
def testConsumerNotExistTopic() {
val newTopic = "new-topic"
+ CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+ Thread.sleep(200)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
- val logFile = new File(config.logDir, newTopic + "-0")
- assertTrue(!logFile.exists)
}
/**
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Fri Jun 1 01:53:19 2012
@@ -27,9 +27,9 @@ import org.easymock.EasyMock
import kafka.network._
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
import kafka.cluster.Broker
-import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
+import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
@@ -80,6 +80,7 @@ class TopicMetadataTest extends JUnit3Su
// topic metadata request only requires 2 APIs from the log manager
val logManager = EasyMock.createMock(classOf[LogManager])
val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+ val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
EasyMock.replay(logManager)
@@ -94,7 +95,7 @@ class TopicMetadataTest extends JUnit3Su
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
+ val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Fri Jun 1 01:53:19 2012
@@ -63,33 +63,6 @@ class LogOffsetTest extends JUnit3Suite
}
@Test
- def testEmptyLogs() {
- val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
- assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
-
- val name = "test"
- val logFile = new File(logDir, name + "-0")
-
- {
- val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
- assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
- assertTrue(!logFile.exists())
- }
-
- {
- val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
- assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
- assertTrue(!logFile.exists())
- }
-
- {
- val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
- assertEquals( 0, offsets.length )
- assertTrue(!logFile.exists())
- }
- }
-
- @Test
def testGetOffsetsBeforeLatestTime() {
val topicPartition = "kafka-" + 0
val topic = topicPartition.split("-").head
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Jun 1 01:53:19 2012
@@ -50,7 +50,7 @@ class LogTest extends JUnitSuite {
createEmptyLogs(logDir, 0)
new Log(logDir, 1024, 1000, false)
}
-
+
@Test
def testLoadInvalidLogsFails() {
createEmptyLogs(logDir, 0, 15)
@@ -61,7 +61,7 @@ class LogTest extends JUnitSuite {
case e: IllegalStateException => "This is good"
}
}
-
+
@Test
def testAppendAndRead() {
val log = new Log(logDir, 1024, 1000, false)
@@ -77,7 +77,7 @@ class LogTest extends JUnitSuite {
}
assertEquals(10, current)
}
-
+
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
@@ -96,7 +96,7 @@ class LogTest extends JUnitSuite {
case e: OffsetOutOfRangeException => "This is good."
}
}
-
+
/** Test that writing and reading beyond the log size boundary works */
@Test
def testLogRolls() {
@@ -106,7 +106,7 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
log.flush
-
+
/* now do successive reads and iterate over the resulting message sets counting the messages
* we should find exact 100 messages.
*/
@@ -124,7 +124,7 @@ class LogTest extends JUnitSuite {
}
assertEquals("We did not find all the messages we put in", numMessages, current)
}
-
+
@Test
def testFindSegment() {
assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
@@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
val deletedSegments = log.markDeletedWhile(_ => true)
// we shouldn't delete the last empty log segment.
- assertTrue(deletedSegments.size == 0)
+ assertTrue("We shouldn't delete the last empty log segment", log.segments.view.size == 1)
// we now have a new log
assertEquals(curOffset, log.nextAppendOffset)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Fri Jun 1 01:53:19 2012
@@ -49,6 +49,53 @@ class SegmentListTest extends JUnitSuite
}
@Test
+ def testTruncLast() {
+ {
+ val hd = List(1,2,3)
+ val tail = List(4,5,6)
+ val sl = new SegmentList(hd ::: tail)
+ val view = sl.view
+ assertEquals(hd ::: tail, view.iterator.toList)
+ val deleted = sl.truncLast(2)
+ assertEquals(hd, sl.view.iterator.toList)
+ assertEquals(tail, deleted.iterator.toList)
+ assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+ }
+
+ {
+ val hd = List(1,2,3,4,5)
+ val tail = List(6)
+ val sl = new SegmentList(hd ::: tail)
+ val view = sl.view
+ assertEquals(hd ::: tail, view.iterator.toList)
+ try {
+ sl.truncLast(6)
+ sl.truncLast(5)
+ sl.truncLast(-1)
+ fail("Attempt to truncate with illegal index should fail")
+ }catch {
+ case e: IllegalArgumentException => // this is ok
+ }
+ val deleted = sl.truncLast(4)
+ assertEquals(hd, sl.view.iterator.toList)
+ assertEquals(tail, deleted.iterator.toList)
+ assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+ }
+
+ {
+ val hd = List(1)
+ val tail = List(2,3,4,5,6)
+ val sl = new SegmentList(hd ::: tail)
+ val view = sl.view
+ assertEquals(hd ::: tail, view.iterator.toList)
+ val deleted = sl.truncLast(0)
+ assertEquals(hd, sl.view.iterator.toList)
+ assertEquals(tail, deleted.iterator.toList)
+ assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+ }
+ }
+
+ @Test
def testTruncBeyondList() {
val sl = new SegmentList(List(1, 2))
sl.trunc(3)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Jun 1 01:53:19 2012
@@ -91,32 +91,30 @@ class ProducerTest extends JUnit3Suite w
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
-
+ // create topic with 1 partition
+ CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
val producer = new Producer[String, String](config)
try {
- // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0, but
- // since partition 0 can exist on any of the two brokers, we need to fetch from both brokers
+ // Available partition ids should be 0.
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
- Thread.sleep(100)
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
- Thread.sleep(1000)
- // cross check if one of the brokers got the messages
- val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- val messageSet1 = response1.messageSet("new-topic", 0).iterator
- val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- val messageSet2 = response2.messageSet("new-topic", 0).iterator
- assertTrue("Message set should have 1 message", (messageSet1.hasNext || messageSet2.hasNext))
-
- if(messageSet1.hasNext) {
- assertEquals(new Message("test1".getBytes), messageSet1.next.message)
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet1.next.message)
- }
- else {
- assertEquals(new Message("test1".getBytes), messageSet2.next.message)
- assertTrue("Message set should have 1 message", messageSet2.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+ // get the leader
+ val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+ assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+ val leader = leaderOpt.get
+
+ val messageSet = if(leader == server1.config.brokerId) {
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ response1.messageSet("new-topic", 0).iterator
+ }else {
+ val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ response2.messageSet("new-topic", 0).iterator
}
+ assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+ assertEquals(new Message("test1".getBytes), messageSet.next.message)
+ assertTrue("Message set should have 1 message", messageSet.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet.next.message)
} catch {
case e: Exception => fail("Not expected", e)
} finally {
@@ -178,65 +176,6 @@ class ProducerTest extends JUnit3Suite w
}
@Test
- def testZKSendToExistingTopicWithNoBrokers() {
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("partitioner.class", "kafka.utils.StaticPartitioner")
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
- val config = new ProducerConfig(props)
-
- val producer = new Producer[String, String](config)
- var server: KafkaServer = null
-
- // create topic
- CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-
- try {
- // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
- // all partitions have broker 0 as the leader.
- producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
- Thread.sleep(100)
- // cross check if brokers got the messages
- val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- val messageSet1 = response1.messageSet("new-topic", 0).iterator
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test".getBytes), messageSet1.next.message)
-
- // shutdown server2
- server2.shutdown
- server2.awaitShutdown()
- Thread.sleep(100)
- // delete the new-topic logs
- Utils.rm(server2.config.logDir)
- Thread.sleep(100)
- // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- val config2 = new KafkaConfig(props2) {
- override val numPartitions = 4
- }
- server = TestUtils.createServer(config2)
- Thread.sleep(100)
-
- // now there are no brokers registered under test-topic.
- producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
- Thread.sleep(100)
-
- // cross check if brokers got the messages
- val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- val messageSet2 = response2.messageSet("new-topic", 0).iterator
- assertTrue("Message set should have 1 message", messageSet2.hasNext)
- assertEquals(new Message("test".getBytes), messageSet2.next.message)
-
- } catch {
- case e: Exception => fail("Not expected", e)
- } finally {
- if(server != null) server.shutdown
- producer.close
- }
- }
-
- @Test
def testAsyncSendCanCorrectlyFailWithTimeout() {
val timeoutMs = 500
val props = new Properties()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jun 1 01:53:19 2012
@@ -177,7 +177,7 @@ class SyncProducerTest extends JUnit3Sui
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
server.requestHandlerPool.shutdown()
-
+
val t1 = SystemTime.milliseconds
try {
val response2 = producer.send(request)
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Fri Jun 1 01:53:19 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.cluster.{Partition, Replica}
+import org.easymock.EasyMock
+import kafka.log.Log
+import kafka.utils.{Time, MockTime, TestUtils}
+import org.junit.Assert._
+import org.I0Itec.zkclient.ZkClient
+
+class ISRExpirationTest extends JUnit3Suite {
+
+ var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
+ val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+ override val keepInSyncTimeMs = 100L
+ override val keepInSyncBytes = 10L
+ })
+ val topic = "foo"
+
+ def testISRExpirationForStuckFollowers() {
+ val time = new MockTime
+ // create leader replica
+ val log = EasyMock.createMock(classOf[kafka.log.Log])
+ EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
+ EasyMock.expect(log.setHW(5L)).times(1)
+ EasyMock.replay(log)
+
+ // add one partition
+ val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+ assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+ val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+ // set remote replicas leo to something low, like 2
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+
+ time.sleep(150)
+ leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+
+ var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+ // add all replicas back to the ISR
+ partition0.inSyncReplicas ++= partition0.assignedReplicas()
+ assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+
+ leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ // let the follower catch up only upto 3
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+ time.sleep(150)
+ // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
+ // pulled any data for > keepInSyncTimeMs ms. So it is stuck
+ partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+ EasyMock.verify(log)
+ }
+
+ def testISRExpirationForSlowFollowers() {
+ val time = new MockTime
+ // create leader replica
+ val log = getLogWithHW(15L)
+ // add one partition
+ val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L)
+ assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+ val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+ // set remote replicas leo to something low, like 4
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+
+ time.sleep(150)
+ leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ time.sleep(10)
+ (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+ val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+ EasyMock.verify(log)
+ }
+
+ def testISRExpirationForMultiplePartitions() {
+ val time = new MockTime
+ // mock zkclient
+ val zkClient = EasyMock.createMock(classOf[ZkClient])
+ EasyMock.replay(zkClient)
+ // create replica manager
+ val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+ try {
+ val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+ // create leader log
+ val log0 = getLogWithHW(5L)
+
+ // create leader and follower replicas
+ val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+ val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+
+ partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0)
+ // set the leader and its hw and the hw update time
+ partition0.leaderId(Some(configs.head.brokerId))
+ partition0.leaderHW(Some(5L))
+
+ // set the leo for non-leader replicas to something low
+ (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+
+ val log1 = getLogWithHW(15L)
+ // create leader and follower replicas for partition 1
+ val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet)
+ val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet)
+ val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0)
+
+ partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1)
+ // set the leader and its hw and the hw update time
+ partition1.leaderId(Some(configs.head.brokerId))
+ partition1.leaderHW(Some(15L))
+
+ // set the leo for non-leader replicas to something low
+ (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+
+ time.sleep(150)
+ leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
+ leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+ time.sleep(10)
+ (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+ val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+ val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
+
+ EasyMock.verify(log0)
+ EasyMock.verify(log1)
+ }catch {
+ case e => e.printStackTrace()
+ }finally {
+ replicaManager.close()
+ }
+ }
+
+ private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
+ localLog: Log, leaderHW: Long): Partition = {
+ val partition = new Partition(topic, partitionId, time)
+ val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+
+ val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+ partition.assignedReplicas(Some(allReplicas.toSet))
+ // set in sync replicas for this partition to all the assigned replicas
+ partition.inSyncReplicas = allReplicas.toSet
+ // set the leader and its hw and the hw update time
+ partition.leaderId(Some(leaderId))
+ partition.leaderHW(Some(leaderHW))
+ partition
+ }
+
+ private def getLogWithHW(hw: Long): Log = {
+ val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+ EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
+ EasyMock.expect(log1.setHW(hw)).times(1)
+ EasyMock.replay(log1)
+
+ log1
+ }
+
+ private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+ configs.filter(_.brokerId != leaderId).map { config =>
+ new Replica(config.brokerId, partition, topic)
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Jun 1 01:53:19 2012
@@ -51,6 +51,7 @@ class LeaderElectionTest extends JUnit3S
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
servers ++= List(server1, server2)
+ try {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
@@ -59,7 +60,7 @@ class LeaderElectionTest extends JUnit3S
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -68,7 +69,7 @@ class LeaderElectionTest extends JUnit3S
servers.head.shutdown()
// check if leader moves to the other server
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
Thread.sleep(zookeeper.tickTime)
@@ -85,9 +86,14 @@ class LeaderElectionTest extends JUnit3S
// test if the leader is the preferred replica
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
- // shutdown the servers and delete data hosted on them
- servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
+ }catch {
+ case e => error("Error while running leader election test ", e)
+ } finally {
+ // shutdown the servers and delete data hosted on them
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
+
+ }
}
// Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,17 +111,17 @@ class LeaderElectionTest extends JUnit3S
var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
- assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+ assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
- assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+ assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
- assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+ assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
}finally {
TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Fri Jun 1 01:53:19 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.producer.ProducerData
+import kafka.serializer.StringEncoder
+import kafka.admin.CreateTopicCommand
+import kafka.cluster.{Replica, Partition, Broker}
+import kafka.utils.{MockTime, TestUtils}
+import junit.framework.Assert._
+import java.io.File
+import kafka.log.Log
+
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val props = createBrokerConfigs(2)
+ val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+ var brokers: Seq[KafkaServer] = null
+ val topic = "foobar"
+
+ override def setUp() {
+ super.setUp()
+ brokers = configs.map(config => TestUtils.createServer(config))
+ }
+
+ override def tearDown() {
+ super.tearDown()
+ brokers.foreach(_.shutdown())
+ }
+
+ def testReplicaFetcherThread() {
+ val partition = 0
+ val testMessageList = List("test1", "test2", "test3", "test4")
+ val leaderBrokerId = configs.head.brokerId
+ val followerBrokerId = configs.last.brokerId
+ val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+
+ // create a topic and partition
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+
+ // send test messages to leader
+ val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+ producer.send(new ProducerData[String, String](topic, "test", testMessageList))
+
+ // create a tmp directory
+ val tmpLogDir = TestUtils.tempDir()
+ val replicaLogDir = new File(tmpLogDir, topic + "-" + partition)
+ replicaLogDir.mkdirs()
+ val replicaLog = new Log(replicaLogDir, 500, 500, false)
+
+ // create replica fetch thread
+ val time = new MockTime
+ val testPartition = new Partition(topic, partition, time)
+ testPartition.leaderId(Some(leaderBrokerId))
+ val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
+ val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
+
+ // start a replica fetch thread to the above broker
+ replicaFetchThread.start()
+
+ Thread.sleep(700)
+ replicaFetchThread.shutdown()
+
+ assertEquals(60L, testReplica.log.get.logEndOffset)
+ replicaLog.close()
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Fri Jun 1 01:53:19 2012
@@ -20,7 +20,6 @@ package kafka.server
import scala.collection._
import org.junit.{After, Before, Test}
import junit.framework.Assert._
-import kafka.server._
import kafka.message._
import kafka.api._
import kafka.utils.TestUtils
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Jun 1 01:53:19 2012
@@ -32,10 +32,10 @@ import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import scala.collection.Map
-import kafka.serializer.Encoder
-import kafka.api.{ProducerRequest, TopicData, PartitionData}
+import kafka.api.{TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
+import kafka.serializer.{DefaultEncoder, Encoder}
/**
* Utility functions to help with testing
@@ -122,6 +122,7 @@ object TestUtils extends Logging {
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ props.put("replica.socket.timeout.ms", "1500")
props
}
@@ -280,12 +281,13 @@ object TestUtils extends Logging {
/**
* Create a producer for the given host and port
*/
- def createProducer[K, V](zkConnect: String): Producer[K, V] = {
+ def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
+ props.put("serializer.class", encoder.getClass.getCanonicalName)
new Producer[K, V](new ProducerConfig(props))
}