You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/06/01 03:53:21 UTC

svn commit: r1344964 [2/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/log/ core/src/...

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Fri Jun  1 01:53:19 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+import kafka.api.FetchRequestBuilder
+import kafka.utils.Logging
+import kafka.cluster.{Broker, Replica}
+import kafka.consumer.SimpleConsumer
+
+class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
+  extends Thread(name) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
+    config.replicaSocketTimeoutMs, config.replicaSocketBufferSize)
+
+  override def run() {
+    try {
+      info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
+      while(isRunning.get()) {
+        val builder = new FetchRequestBuilder().
+          clientId(name).
+          replicaId(replica.brokerId).
+          maxWait(config.replicaMaxWaitTimeMs).
+          minBytes(config.replicaMinBytes)
+
+        // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message
+        // replication actually works
+        val fetchOffset = replica.logEndOffset()
+        trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
+        builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize)
+
+        val fetchRequest = builder.build()
+        val response = replicaConsumer.fetch(fetchRequest)
+        // TODO: KAFKA-339 Check for error. Don't blindly read the messages
+        // append messages to local log
+        replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
+        // record the hw sent by the leader for this partition
+        val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
+        replica.highWatermark(Some(followerHighWatermark))
+        trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
+      }
+    }catch {
+      case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
+    }
+    shutdownComplete()
+  }
+
+  private def shutdownComplete() = {
+    replicaConsumer.close()
+    shutdownLatch.countDown
+  }
+
+  def getLeader(): Broker = leaderBroker
+
+  def shutdown() {
+    info("Shutting down replica fetcher thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("Replica fetcher thread shutdown completed")
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Jun  1 01:53:19 2012
@@ -18,47 +18,232 @@ package kafka.server
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import kafka.utils.Logging
 import collection.mutable
+import java.lang.IllegalStateException
+import mutable.ListBuffer
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
+import kafka.common.InvalidPartitionException
+
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
+
+  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
+  private var leaderReplicas = new ListBuffer[Partition]()
+  private val leaderReplicaLock = new ReentrantLock()
+  private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+  // start ISR expiration thread
+  isrExpirationScheduler.startUp
+  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+
+  def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
+    val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
+    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+
+    val replicaOpt = partition.getReplica(config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        info("Changing remote replica %s into a local replica".format(replica.toString))
+        replica.log match {
+          case None =>
+            replica.log = Some(log)
+          case Some(log) => // nothing to do since log already exists
+        }
+      case None =>
+        partition.addReplica(localReplica)
+    }
+    val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
+    partition.assignedReplicas(Some(assignedReplicas))
+    // get the replica objects for the assigned replicas for this partition
+    info("Added local replica %d for topic %s partition %s on broker %d"
+      .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+    localReplica
+  }
 
-class ReplicaManager(config: KafkaConfig) extends Logging {
+  def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
+    val newPartition = allReplicas.contains((topic, partitionId))
+    newPartition match {
+      case true => // partition exists, do nothing
+        allReplicas.get((topic, partitionId)).get
+      case false => // create remote replicas for each replica id in assignedReplicas
+        val partition = new Partition(topic, partitionId, time)
+        allReplicas += (topic, partitionId) -> partition
+        (assignedReplicaIds - config.brokerId).foreach(
+          replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
+        partition
+    }
+  }
 
-  private val replicas = new mutable.HashMap[(String, Int), Replica]()
+  def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
+    val partitionOpt = allReplicas.get((topic, partitionId))
+    partitionOpt match {
+      case Some(partition) => partition
+      case None =>
+        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+        .format(topic, partitionId, config.brokerId))
+    }
+  }
 
-  def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
-        r.log match {
-          case None =>
-            r.log = Some(log)
-          case Some(l) => // nothing to do since log already exists
+  def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
+    val remoteReplica = new Replica(replicaId, partition, topic)
+
+    val replicaAdded = partition.addReplica(remoteReplica)
+    if(replicaAdded)
+      info("Added remote replica %d for topic %s partition %s on broker %d"
+        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+    remoteReplica
+  }
+
+  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        replicas.getReplica(replicaId)
+      case None =>
+        None
+    }
+  }
+
+  def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        Some(replicas.leaderReplica())
+      case None =>
+        throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+      "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+    }
+  }
+
+  def getPartition(topic: String, partitionId: Int): Option[Partition] =
+    allReplicas.get((topic, partitionId))
+
+  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+    // set the replica leo
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    partition.updateReplicaLEO(replica, fetchOffset)
+  }
+
+  def maybeIncrementLeaderHW(replica: Replica) {
+    // set the replica leo
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    // set the leader HW to min of the leo of all replicas
+    val allLeos = partition.inSyncReplicas.map(_.logEndOffset())
+    val newHw = allLeos.min
+    val oldHw = partition.leaderHW()
+    if(newHw > oldHw) {
+      debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
+      partition.leaderHW(Some(newHw))
+    }
+  }
+
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    // stop replica fetcher thread, if any
+    replica.stopReplicaFetcherThread()
+    // read and cache the ISR
+    replica.partition.leaderId(Some(replica.brokerId))
+    replica.partition.updateISR(currentISRInZk.toSet)
+    // also add this partition to the list of partitions for which the leader is the current broker
+    try {
+      leaderReplicaLock.lock()
+      leaderReplicas += replica.partition
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+  }
+
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    info("Broker %d becoming follower to leader %d for topic %s partition %d"
+      .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    // remove this replica's partition from the ISR expiration queue
+    try {
+      leaderReplicaLock.lock()
+      leaderReplicas -= replica.partition
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+    replica.log match {
+      case Some(log) =>  // log is already started
+        log.recoverUptoLastCheckpointedHW()
+      case None =>
+    }
+    // get leader for this replica
+    val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
+    val isReplicaAFollower = replica.getIfFollowerAndLeader()
+    // Become follower only if it is not already following the same leader
+    if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
+      // stop fetcher thread to previous leader
+      replica.stopReplicaFetcherThread()
+      // start fetcher thread to current leader
+      replica.startReplicaFetcherThread(leaderBroker, config)
+    }
+  }
+
+  def maybeShrinkISR(): Unit = {
+    try {
+      info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+        .format(config.keepInSyncTimeMs))
+
+      leaderReplicaLock.lock()
+      leaderReplicas.foreach { partition =>
+         // shrink ISR if a follower is slow or stuck
+        val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes)
+        if(outOfSyncReplicas.size > 0) {
+          val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
+          assert(newInSyncReplicas.size > 0)
+          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
+            newInSyncReplicas.map(_.brokerId).mkString(",")))
+          // update ISR in zk and in memory
+          partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
+        }
+      }
+    }catch {
+      case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+  }
+
+  def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    if(partition.inSyncReplicas.contains(replica)) false
+    else if(partition.assignedReplicas().contains(replica)) {
+      val leaderHW = partition.leaderHW()
+      replica.logEndOffset() >= leaderHW
+    }
+    else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+      " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+  }
+
+  def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
+    val replicaOpt = getReplica(topic, partition, replicaId)
+    replicaOpt match {
+      case Some(replica) =>
+        updateReplicaLEO(replica, offset)
+        // check if this replica needs to be added to the ISR
+        if(checkIfISRCanBeExpanded(replica)) {
+          val newISR = replica.partition.inSyncReplicas + replica
+          // update ISR in ZK and cache
+          replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
         }
+        maybeIncrementLeaderHW(replica)
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
-        replicas.put((topic, partitionId), replica)
-        info("Added local replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
     }
-    replicas.get((topic, partitionId)).get
   }
 
-  def addRemoteReplica(topic: String, partitionId: Int): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
+  def recordLeaderLogUpdate(topic: String, partition: Int) = {
+    val replicaOpt = getReplica(topic, partition, config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        replica.logEndOffsetUpdateTime(Some(time.milliseconds))
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-        replicas.put((topic, partitionId), replica)
-        info("Added remote replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
     }
-    replicas.get((topic, partitionId)).get
   }
 
-  def getReplica(topic: String, partitionId: Int): Option[Replica] = {
-    replicas.get((topic, partitionId))
+  def close() {
+    isrExpirationScheduler.shutdown()
+    allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Fri Jun  1 01:53:19 2012
@@ -21,7 +21,6 @@ import scala.collection._
 import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.api._
 import kafka.network._
 import kafka.utils._
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Fri Jun  1 01:53:19 2012
@@ -39,24 +39,24 @@ class KafkaScheduler(val numThreads: Int
 
   def hasShutdown: Boolean = executor.isShutdown
 
-  private def checkIfExecutorHasStarted = {
+  private def ensureExecutorHasStarted = {
     if(executor == null)
       throw new IllegalStateException("Kafka scheduler has not been started")
   }
 
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
   }
 
   def shutdownNow() {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.shutdownNow()
     info("Forcing shutdown of scheduler " + baseThreadName)
   }
 
   def shutdown() {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.shutdown()
     info("Shutdown scheduler " + baseThreadName)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Fri Jun  1 01:53:19 2012
@@ -133,20 +133,21 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
     try {
       // NOTE: first increment epoch, then become leader
       val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
       createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
         "%d;%d".format(brokerId, newEpoch))
       val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
       updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
-        "%s;%d".format(currentISR.mkString(","), newEpoch))
+        "%s;%d".format(updatedISR.mkString(","), newEpoch))
       info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
-      Some(newEpoch)
+      Some(newEpoch, updatedISR)
     } catch {
       case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
-      case oe => None
+      case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
     }
   }
 
@@ -161,7 +162,7 @@ object ZkUtils extends Logging {
 
     val newEpoch = epoch match {
       case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
         partitionEpoch + 1
       case None =>
         // this is the first time leader is elected for this partition. So set epoch to 1

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Fri Jun  1 01:53:19 2012
@@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.C
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
 log4j.logger.org.I0Itec.zkclient.ZkClient=WARN

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Fri Jun  1 01:53:19 2012
@@ -43,7 +43,7 @@ class TopicCountTest extends JUnitSuite 
 */
   @Test
   def testPartition() {
-    assertTrue(new Partition("foo", 10) == new Partition("foo", 10))
-    assertTrue(new Partition("foo", 1) != new Partition("foo", 0))
+    assertTrue(new Partition("foo", 10).equals(new Partition("foo", 10)))
+    assertTrue(!new Partition("foo", 1).equals(new Partition("foo", 0)))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Fri Jun  1 01:53:19 2012
@@ -18,8 +18,6 @@
 package kafka.integration
 
 import junit.framework.Assert._
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
@@ -61,10 +59,10 @@ class AutoOffsetResetTest extends JUnit3
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
     
-  def testResetToLatestWhenOffsetTooHigh() = 
+  def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
-    
-  def testResetToLatestWhenOffsetTooLow() = 
+
+  def testResetToLatestWhenOffsetTooLow() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
   
   /* Produce the given number of messages, create a consumer with the given offset policy, 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Fri Jun  1 01:53:19 2012
@@ -25,6 +25,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
 
 class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -55,6 +56,8 @@ class BackwardsCompatibilityTest extends
 
   // test for reading data with magic byte 0
   def testProtocolVersion0() {
+    CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
     var fetchOffset: Long = 0L
     var messageCount: Int = 0

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Fri Jun  1 01:53:19 2012
@@ -30,10 +30,11 @@ import kafka.integration.KafkaServerTest
 import kafka.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
+import kafka.admin.CreateTopicCommand
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
-  val numNodes = 2
+  val numNodes = 1
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
       yield new KafkaConfig(props)
@@ -54,6 +55,7 @@ class FetcherTest extends JUnit3Suite wi
 
   override def setUp() {
     super.setUp
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
     fetcher.startConnections(topicInfos, cluster)
@@ -69,11 +71,9 @@ class FetcherTest extends JUnit3Suite wi
     var count = sendMessages(perNode)
     waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
     count = sendMessages(perNode)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
   }
   

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jun  1 01:53:19 2012
@@ -17,23 +17,22 @@
 
 package kafka.integration
 
-import java.io.File
 import java.nio.ByteBuffer
-import java.util.Properties
 import junit.framework.Assert._
-import kafka.admin.CreateTopicCommand
 import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
-import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.message.Message
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import kafka.message.Message
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
+import kafka.admin.CreateTopicCommand
+import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -93,7 +92,7 @@ class PrimitiveApiTest extends JUnit3Sui
       case e: FetchRequestFormatException => "success"
     }
   }
-  
+
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
     val props = new Properties()
@@ -158,7 +157,7 @@ class PrimitiveApiTest extends JUnit3Sui
         messages += topic -> messageList
         producer.send(producerData)
         builder.addFetch(topic, partition, 0, 10000)
-      }
+    }
 
       // wait a bit for produced message to be available
       Thread.sleep(700)
@@ -199,8 +198,9 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
+        for( (topic, partition) <- topics) {
           response.messageSet(topic, -1).iterator
+        }
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: InvalidPartitionException => "this is good"
@@ -332,10 +332,10 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testConsumerNotExistTopic() {
     val newTopic = "new-topic"
+    CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+    Thread.sleep(200)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
-    val logFile = new File(config.logDir, newTopic + "-0")
-    assertTrue(!logFile.exists)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Fri Jun  1 01:53:19 2012
@@ -27,9 +27,9 @@ import org.easymock.EasyMock
 import kafka.network._
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
-import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -80,6 +80,7 @@ class TopicMetadataTest extends JUnit3Su
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
     EasyMock.replay(logManager)
@@ -94,7 +95,7 @@ class TopicMetadataTest extends JUnit3Su
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Fri Jun  1 01:53:19 2012
@@ -63,33 +63,6 @@ class LogOffsetTest extends JUnit3Suite 
   }
 
   @Test
-  def testEmptyLogs() {
-    val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
-    assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
-
-    val name = "test"
-    val logFile = new File(logDir, name + "-0")
-    
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
-      assertEquals( 0, offsets.length )
-      assertTrue(!logFile.exists())
-    }
-  }
-
-  @Test
   def testGetOffsetsBeforeLatestTime() {
     val topicPartition = "kafka-" + 0
     val topic = topicPartition.split("-").head

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Jun  1 01:53:19 2012
@@ -50,7 +50,7 @@ class LogTest extends JUnitSuite {
     createEmptyLogs(logDir, 0)
     new Log(logDir, 1024, 1000, false)
   }
-  
+
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
@@ -61,7 +61,7 @@ class LogTest extends JUnitSuite {
       case e: IllegalStateException => "This is good"
     }
   }
-  
+
   @Test
   def testAppendAndRead() {
     val log = new Log(logDir, 1024, 1000, false)
@@ -77,7 +77,7 @@ class LogTest extends JUnitSuite {
     }
     assertEquals(10, current)
   }
-  
+
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
@@ -96,7 +96,7 @@ class LogTest extends JUnitSuite {
       case e: OffsetOutOfRangeException => "This is good."
     }
   }
-  
+
   /** Test that writing and reading beyond the log size boundary works */
   @Test
   def testLogRolls() {
@@ -106,7 +106,7 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
     log.flush
-    
+
     /* now do successive reads and iterate over the resulting message sets counting the messages
      * we should find exact 100 messages.
      */
@@ -124,7 +124,7 @@ class LogTest extends JUnitSuite {
     }
     assertEquals("We did not find all the messages we put in", numMessages, current)
   }
-  
+
   @Test
   def testFindSegment() {
     assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
@@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
       val deletedSegments = log.markDeletedWhile(_ => true)
 
       // we shouldn't delete the last empty log segment.
-      assertTrue(deletedSegments.size == 0)
+      assertTrue("We shouldn't delete the last empty log segment", log.segments.view.size == 1)
 
       // we now have a new log
       assertEquals(curOffset, log.nextAppendOffset)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Fri Jun  1 01:53:19 2012
@@ -49,6 +49,53 @@ class SegmentListTest extends JUnitSuite
   }
   
   @Test
+  def testTruncLast() {
+    {
+      val hd = List(1,2,3)
+      val tail = List(4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(2)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1,2,3,4,5)
+      val tail = List(6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      try {
+        sl.truncLast(6)
+        sl.truncLast(5)
+        sl.truncLast(-1)
+        fail("Attempt to truncate with illegal index should fail")
+      }catch {
+        case e: IllegalArgumentException => // this is ok
+      }
+      val deleted = sl.truncLast(4)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1)
+      val tail = List(2,3,4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(0)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+  }
+
+  @Test
   def testTruncBeyondList() {
     val sl = new SegmentList(List(1, 2))
     sl.trunc(3)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Jun  1 01:53:19 2012
@@ -91,32 +91,30 @@ class ProducerTest extends JUnit3Suite w
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-
+    // create topic with 1 partition
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
     val producer = new Producer[String, String](config)
     try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0, but
-      // since partition 0 can exist on any of the two brokers, we need to fetch from both brokers
+      // Available partition ids should be 0.
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(1000)
-      // cross check if one of the brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", (messageSet1.hasNext || messageSet2.hasNext))
-
-      if(messageSet1.hasNext) {
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-        assertTrue("Message set should have 1 message", messageSet1.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-      }
-      else {
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
-        assertTrue("Message set should have 1 message", messageSet2.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+      // get the leader
+      val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+      assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+      val leader = leaderOpt.get
+
+      val messageSet = if(leader == server1.config.brokerId) {
+        val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response1.messageSet("new-topic", 0).iterator
+      }else {
+        val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response2.messageSet("new-topic", 0).iterator
       }
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
@@ -178,65 +176,6 @@ class ProducerTest extends JUnit3Suite w
   }
 
   @Test
-  def testZKSendToExistingTopicWithNoBrokers() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-
-    val producer = new Producer[String, String](config)
-    var server: KafkaServer = null
-
-    // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-
-    try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-      // all partitions have broker 0 as the leader.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-      // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-
-      // shutdown server2
-      server2.shutdown
-      server2.awaitShutdown()
-      Thread.sleep(100)
-      // delete the new-topic logs
-      Utils.rm(server2.config.logDir)
-      Thread.sleep(100)
-      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
-      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-      val config2 = new KafkaConfig(props2) {
-        override val numPartitions = 4
-      }
-      server = TestUtils.createServer(config2)
-      Thread.sleep(100)
-
-      // now there are no brokers registered under test-topic.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-
-      // cross check if brokers got the messages
-      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.next.message)
-
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      if(server != null) server.shutdown
-      producer.close
-    }
-  }
-
-  @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jun  1 01:53:19 2012
@@ -177,7 +177,7 @@ class SyncProducerTest extends JUnit3Sui
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
     server.requestHandlerPool.shutdown()
-    
+
     val t1 = SystemTime.milliseconds
     try {
       val response2 = producer.send(request)

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Fri Jun  1 01:53:19 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.cluster.{Partition, Replica}
+import org.easymock.EasyMock
+import kafka.log.Log
+import kafka.utils.{Time, MockTime, TestUtils}
+import org.junit.Assert._
+import org.I0Itec.zkclient.ZkClient
+
+class ISRExpirationTest extends JUnit3Suite {
+
+  var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val keepInSyncTimeMs = 100L
+    override val keepInSyncBytes = 10L
+  })
+  val topic = "foo"
+
+  def testISRExpirationForStuckFollowers() {
+    val time = new MockTime
+    // create leader replica
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
+    EasyMock.expect(log.setHW(5L)).times(1)
+    EasyMock.replay(log)
+
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    // set remote replicas leo to something low, like 2
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+
+    time.sleep(150)
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+    // add all replicas back to the ISR
+    partition0.inSyncReplicas ++= partition0.assignedReplicas()
+    assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    // let the follower catch up only upto 3
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+    time.sleep(150)
+    // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
+    // pulled any data for > keepInSyncTimeMs ms. So it is stuck
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
+  def testISRExpirationForSlowFollowers() {
+    val time = new MockTime
+    // create leader replica
+    val log = getLogWithHW(15L)
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    // set remote replicas leo to something low, like 4
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+
+    time.sleep(150)
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    time.sleep(10)
+    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+    EasyMock.verify(log)
+  }
+
+  def testISRExpirationForMultiplePartitions() {
+    val time = new MockTime
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+    try {
+      val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+      // create leader log
+      val log0 = getLogWithHW(5L)
+
+      // create leader and follower replicas
+      val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+      val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+
+      partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0)
+      // set the leader and its hw and the hw update time
+      partition0.leaderId(Some(configs.head.brokerId))
+      partition0.leaderHW(Some(5L))
+
+      // set the leo for non-leader replicas to something low
+      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+
+      val log1 = getLogWithHW(15L)
+      // create leader and follower replicas for partition 1
+      val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet)
+      val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet)
+      val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0)
+
+      partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1)
+      // set the leader and its hw and the hw update time
+      partition1.leaderId(Some(configs.head.brokerId))
+      partition1.leaderHW(Some(15L))
+
+      // set the leo for non-leader replicas to something low
+      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+
+      time.sleep(150)
+      leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
+      leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+      time.sleep(10)
+      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+      val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+      val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
+
+      EasyMock.verify(log0)
+      EasyMock.verify(log1)
+    }catch {
+      case e => e.printStackTrace()
+    }finally {
+      replicaManager.close()
+    }
+  }
+
+  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
+                                               localLog: Log, leaderHW: Long): Partition = {
+    val partition = new Partition(topic, partitionId, time)
+    val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+
+    val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+    partition.assignedReplicas(Some(allReplicas.toSet))
+    // set in sync replicas for this partition to all the assigned replicas
+    partition.inSyncReplicas = allReplicas.toSet
+    // set the leader and its hw and the hw update time
+    partition.leaderId(Some(leaderId))
+    partition.leaderHW(Some(leaderHW))
+    partition
+  }
+
+  private def getLogWithHW(hw: Long): Log = {
+    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
+    EasyMock.expect(log1.setHW(hw)).times(1)
+    EasyMock.replay(log1)
+
+    log1
+  }
+
+  private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+    configs.filter(_.brokerId != leaderId).map { config =>
+      new Replica(config.brokerId, partition, topic)
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Jun  1 01:53:19 2012
@@ -51,6 +51,7 @@ class LeaderElectionTest extends JUnit3S
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
 
     servers ++= List(server1, server2)
+    try {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -59,7 +60,7 @@ class LeaderElectionTest extends JUnit3S
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -68,7 +69,7 @@ class LeaderElectionTest extends JUnit3S
     servers.head.shutdown()
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     Thread.sleep(zookeeper.tickTime)
@@ -85,9 +86,14 @@ class LeaderElectionTest extends JUnit3S
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
-    // shutdown the servers and delete data hosted on them
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
+    }catch {
+      case e => error("Error while running leader election test ", e)
+    } finally {
+      // shutdown the servers and delete data hosted on them
+      servers.map(server => server.shutdown())
+      servers.map(server => Utils.rm(server.config.logDir))
+
+    }
   }
 
   // Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,17 +111,17 @@ class LeaderElectionTest extends JUnit3S
 
       var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
       assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
     }finally {
       TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Fri Jun  1 01:53:19 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.producer.ProducerData
+import kafka.serializer.StringEncoder
+import kafka.admin.CreateTopicCommand
+import kafka.cluster.{Replica, Partition, Broker}
+import kafka.utils.{MockTime, TestUtils}
+import junit.framework.Assert._
+import java.io.File
+import kafka.log.Log
+
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(2)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var brokers: Seq[KafkaServer] = null
+  val topic = "foobar"
+
+  override def setUp() {
+    super.setUp()
+    brokers = configs.map(config => TestUtils.createServer(config))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    brokers.foreach(_.shutdown())
+  }
+
+  def testReplicaFetcherThread() {
+    val partition = 0
+    val testMessageList = List("test1", "test2", "test3", "test4")
+    val leaderBrokerId = configs.head.brokerId
+    val followerBrokerId = configs.last.brokerId
+    val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+
+    // create a topic and partition
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+
+    // send test messages to leader
+    val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+    producer.send(new ProducerData[String, String](topic, "test", testMessageList))
+
+    // create a tmp directory
+    val tmpLogDir = TestUtils.tempDir()
+    val replicaLogDir = new File(tmpLogDir, topic + "-" + partition)
+    replicaLogDir.mkdirs()
+    val replicaLog = new Log(replicaLogDir, 500, 500, false)
+
+    // create replica fetch thread
+    val time = new MockTime
+    val testPartition = new Partition(topic, partition, time)
+    testPartition.leaderId(Some(leaderBrokerId))
+    val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
+    val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
+
+    // start a replica fetch thread to the above broker
+    replicaFetchThread.start()
+
+    Thread.sleep(700)
+    replicaFetchThread.shutdown()
+
+    assertEquals(60L, testReplica.log.get.logEndOffset)
+    replicaLog.close()
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Fri Jun  1 01:53:19 2012
@@ -20,7 +20,6 @@ package kafka.server
 import scala.collection._
 import org.junit.{After, Before, Test}
 import junit.framework.Assert._
-import kafka.server._
 import kafka.message._
 import kafka.api._
 import kafka.utils.TestUtils

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Jun  1 01:53:19 2012
@@ -32,10 +32,10 @@ import kafka.cluster.Broker
 import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
 import scala.collection.Map
-import kafka.serializer.Encoder
-import kafka.api.{ProducerRequest, TopicData, PartitionData}
+import kafka.api.{TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
+import kafka.serializer.{DefaultEncoder, Encoder}
 
 /**
  * Utility functions to help with testing
@@ -122,6 +122,7 @@ object TestUtils extends Logging {
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("replica.socket.timeout.ms", "1500")
     props
   }
   
@@ -280,12 +281,13 @@ object TestUtils extends Logging {
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](zkConnect: String): Producer[K, V] = {
+  def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
+    props.put("serializer.class", encoder.getClass.getCanonicalName)
     new Producer[K, V](new ProducerConfig(props))
   }