You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/04 23:37:01 UTC

kafka git commit: KAFKA-5355; Test cases to ensure isolation level propagated in delayed fetch

Repository: kafka
Updated Branches:
  refs/heads/trunk c7bc8f7d8 -> 3557f097b


KAFKA-5355; Test cases to ensure isolation level propagated in delayed fetch

Include a few logging improvements.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3230 from hachikuji/KAFKA-5355-TESTS


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3557f097
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3557f097
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3557f097

Branch: refs/heads/trunk
Commit: 3557f097b81edd6518100d79b442223fadf9f81f
Parents: c7bc8f7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 5 00:36:16 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 5 00:36:26 2017 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  19 +-
 .../producer/internals/TransactionManager.java  |   8 +
 .../common/requests/TxnOffsetCommitRequest.java |   4 +
 .../group/GroupMetadataManager.scala            |   5 +-
 core/src/main/scala/kafka/log/Log.scala         |   9 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     |  40 +++-
 .../kafka/api/TransactionsTest.scala            | 108 +++++++---
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   4 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   3 +-
 .../group/GroupMetadataManagerTest.scala        |   3 +-
 .../TransactionStateManagerTest.scala           |   4 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  11 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  56 +++---
 .../kafka/server/ReplicaManagerQuotasTest.scala |  20 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 197 ++++++++++---------
 .../unit/kafka/server/SimpleFetchTest.scala     |  26 ++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 19 files changed, 323 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9be8aa3..e73ff4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -198,7 +198,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             final FetchRequest.Builder request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
-            log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
+            log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(),
+                    fetchTarget);
             client.send(fetchTarget, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
@@ -221,9 +222,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                                 long fetchOffset = request.fetchData().get(partition).fetchOffset;
                                 FetchResponse.PartitionData fetchData = entry.getValue();
 
-                                log.debug("Fetch at offset {} for partition {} returned fetch data {}", fetchOffset,
-                                        partition, fetchData);
-
+                                log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
+                                        isolationLevel, fetchOffset, partition, fetchData);
                                 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                         resp.requestHeader().apiVersion()));
                             }
@@ -782,8 +782,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 }
 
                 long position = this.subscriptions.position(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize));
-                log.debug("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
+                fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
+                        this.fetchSize));
+                log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
+                        partition, position, node);
             } else {
                 log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
             }
@@ -1038,8 +1040,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         if (containsAbortMarker(currentBatch)) {
                             abortedProducerIds.remove(producerId);
                         } else if (isBatchAborted(currentBatch)) {
-                            log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}",
-                                    producerId, currentBatch.baseOffset(), partition);
+                            log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
+                                            "offsets {} to {}",
+                                    partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                             nextFetchOffset = currentBatch.nextOffset();
                             continue;
                         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index dad6b5d..c081b23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -212,6 +212,7 @@ public class TransactionManager {
             throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
                     "active transaction");
 
+        log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId);
         AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
                 producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
@@ -226,6 +227,7 @@ public class TransactionManager {
         if (partitionsInTransaction.contains(topicPartition))
             return;
 
+        log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
         newPartitionsInTransaction.add(topicPartition);
     }
 
@@ -749,6 +751,7 @@ public class TransactionManager {
                 abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
             } else {
                 Set<TopicPartition> addedPartitions = errors.keySet();
+                log.debug("{}Successfully added partitions {} to transaction", logPrefix, addedPartitions);
                 partitionsInTransaction.addAll(addedPartitions);
                 pendingPartitionsInTransaction.removeAll(addedPartitions);
                 transactionStarted = true;
@@ -886,6 +889,9 @@ public class TransactionManager {
             Errors error = addOffsetsToTxnResponse.error();
 
             if (error == Errors.NONE) {
+                log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
+                        builder.consumerGroupId());
+
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
                 transactionStarted = true;
@@ -946,6 +952,8 @@ public class TransactionManager {
                 TopicPartition topicPartition = entry.getKey();
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
+                    log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix,
+                            builder.offsets(), builder.consumerGroupId());
                     pendingTxnOffsetCommits.remove(topicPartition);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 2ea8ecf..20522af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -59,6 +59,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
             return consumerGroupId;
         }
 
+        public Map<TopicPartition, CommittedOffset> offsets() {
+            return offsets;
+        }
+
         @Override
         public TxnOffsetCommitRequest build(short version) {
             return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index b23514a..db3d936 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -484,9 +484,8 @@ class GroupMetadataManager(brokerId: Int,
         val removedGroups = mutable.Set[String]()
 
         while (currOffset < highWaterMark && !shuttingDown.get()) {
-          val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true,
-            isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-
+          val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None,
+            minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
           val memRecords = fetchDataInfo.records match {
             case records: MemoryRecords => records
             case fileRecords: FileRecords =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b9968a2..5522508 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -830,6 +830,11 @@ class Log(@volatile var dir: File,
     }
   }
 
+  private[log] def readUncommitted(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None,
+                                   minOneMessage: Boolean = false): FetchDataInfo = {
+    read(startOffset, maxLength, maxOffset, minOneMessage, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+  }
+
   /**
    * Read messages from the log.
    *
@@ -849,7 +854,7 @@ class Log(@volatile var dir: File,
    * @return The fetch data information including fetch starting offset metadata and messages read.
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
-           isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = {
+           isolationLevel: IsolationLevel): FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
 
     // Because we don't use lock for reading, the synchronization is a little bit tricky.
@@ -1001,7 +1006,7 @@ class Log(@volatile var dir: File,
    */
   def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
     try {
-      val fetchDataInfo = read(offset, 1)
+      val fetchDataInfo = readUncommitted(offset, 1)
       fetchDataInfo.fetchOffsetMetadata
     } catch {
       case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index fb89800..c5cdf57 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -153,8 +153,7 @@ class DelayedFetch(delayMs: Long,
       hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
       readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
       quota = quota,
-      isolationLevel = isolationLevel
-    )
+      isolationLevel = isolationLevel)
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
       tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5e1c9c1..a8d3e94 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -129,7 +129,36 @@ class ReplicaManager(val config: KafkaConfig,
                      quotaManager: ReplicationQuotaManager,
                      val brokerTopicStats: BrokerTopicStats,
                      val metadataCache: MetadataCache,
-                     threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+                     val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                     val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                     val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                     threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           zkUtils: ZkUtils,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManager: ReplicationQuotaManager,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: MetadataCache,
+           threadNamePrefix: Option[String] = None) {
+    this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
+      quotaManager, brokerTopicStats, metadataCache,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      threadNamePrefix)
+  }
+
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
@@ -146,13 +175,6 @@ class ReplicaManager(val config: KafkaConfig,
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
   private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
 
-  val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
-    purgatoryName = "Produce", brokerId = localBrokerId, purgeInterval = config.producerPurgatoryPurgeIntervalRequests)
-  val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
-    purgatoryName = "Fetch", brokerId = localBrokerId, purgeInterval = config.fetchPurgatoryPurgeIntervalRequests)
-  val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
-    purgatoryName = "DeleteRecords", brokerId = localBrokerId, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests)
-
   val leaderCount = newGauge(
     "LeaderCount",
     new Gauge[Int] {
@@ -641,7 +663,7 @@ class ReplicaManager(val config: KafkaConfig,
                        hardMaxBytesLimit: Boolean,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                        quota: ReplicaQuota,
-                       isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Seq[(TopicPartition, LogReadResult)] = {
+                       isolationLevel: IsolationLevel): Seq[(TopicPartition, LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 9aceec8..4593d9c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
-import org.apache.kafka.clients.producer.KafkaProducer
+import kafka.utils.TestUtils.consumeRecords
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ProducerFencedException
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -32,7 +33,7 @@ import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionException
 
 class TransactionsTest extends KafkaServerTestHarness {
@@ -49,7 +50,7 @@ class TransactionsTest extends KafkaServerTestHarness {
   val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
   }
 
   @Before
@@ -62,11 +63,11 @@ class TransactionsTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
 
     for (_ <- 0 until transactionalProducerCount)
-      transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer", servers)
+      createTransactionalProducer("transactional-producer")
     for (_ <- 0 until transactionalConsumerCount)
-      transactionalConsumers += transactionalConsumer("transactional-group")
+      createReadCommittedConsumer("transactional-group")
     for (_ <- 0 until nonTransactionalConsumerCount)
-      nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group")
+      createReadUncommittedConsumer("non-transactional-group")
   }
 
   @After
@@ -79,9 +80,9 @@ class TransactionsTest extends KafkaServerTestHarness {
 
   @Test
   def testBasicTransactions() = {
-    val producer = transactionalProducers(0)
-    val consumer = transactionalConsumers(0)
-    val unCommittedConsumer = nonTransactionalConsumers(0)
+    val producer = transactionalProducers.head
+    val consumer = transactionalConsumers.head
+    val unCommittedConsumer = nonTransactionalConsumers.head
 
     producer.initTransactions()
 
@@ -99,12 +100,12 @@ class TransactionsTest extends KafkaServerTestHarness {
     consumer.subscribe(List(topic1, topic2).asJava)
     unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
 
-    val records = pollUntilExactlyNumRecords(consumer, 2)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
 
-    val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+    val allRecords = consumeRecords(unCommittedConsumer, 4)
     val expectedValues = List("1", "2", "3", "4").toSet
     allRecords.foreach { record =>
       assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
@@ -112,6 +113,53 @@ class TransactionsTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testReadCommittedConsumerShouldNotSeeUndecidedData(): Unit = {
+    val producer1 = transactionalProducers.head
+    val producer2 = createTransactionalProducer("other")
+    val readCommittedConsumer = transactionalConsumers.head
+    val readUncommittedConsumer = nonTransactionalConsumers.head
+
+    producer1.initTransactions()
+    producer2.initTransactions()
+
+    producer1.beginTransaction()
+    producer2.beginTransaction()
+    producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes))
+    producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "1".getBytes))
+    producer2.flush()
+
+    producer1.send(new ProducerRecord(topic1, 0, "a".getBytes, "1".getBytes))
+    producer1.send(new ProducerRecord(topic1, 0, "b".getBytes, "2".getBytes))
+    producer1.send(new ProducerRecord(topic2, 0, "c".getBytes, "3".getBytes))
+    producer1.send(new ProducerRecord(topic2, 0, "d".getBytes, "4".getBytes))
+    producer1.flush()
+
+    producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes))
+    producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "2".getBytes))
+    producer2.commitTransaction()
+
+    // ensure the records are visible to the read uncommitted consumer
+    readUncommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+    consumeRecords(readUncommittedConsumer, 8)
+    readUncommittedConsumer.unsubscribe()
+
+    // we should only see the first two records which come before the undecided second transaction
+    readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+    val records = consumeRecords(readCommittedConsumer, 2)
+    records.foreach { record =>
+      assertEquals("x", new String(record.key))
+      assertEquals("1", new String(record.value))
+    }
+
+    // even if we seek to the end, we should not be able to see the undecided data
+    assertEquals(2, readCommittedConsumer.assignment.size)
+    readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
+    readCommittedConsumer.assignment.asScala.foreach { tp =>
+      assertEquals(1L, readCommittedConsumer.position(tp))
+    }
+  }
+
+  @Test
   def testSendOffsets() = {
     // The basic plan for the test is as follows:
     //  1. Seed topic1 with 1000 unique, numbered, messages.
@@ -129,7 +177,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     val producer = transactionalProducers(0)
 
-    val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+    val consumer = createReadCommittedConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
     consumer.subscribe(List(topic1).asJava)
     producer.initTransactions()
 
@@ -208,7 +256,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     producer2.commitTransaction()  // ok
 
-    val records = pollUntilExactlyNumRecords(consumer, 2)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
@@ -246,7 +294,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     producer2.commitTransaction()  // ok
 
-    val records = pollUntilExactlyNumRecords(consumer, 2)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
@@ -290,7 +338,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     producer2.commitTransaction() // ok
 
-    val records = pollUntilExactlyNumRecords(consumer, 2)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
@@ -336,7 +384,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     producer2.commitTransaction()  // ok
 
-    val records = pollUntilExactlyNumRecords(consumer, 2)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
@@ -345,7 +393,7 @@ class TransactionsTest extends KafkaServerTestHarness {
   private def serverProps() = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
-    // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+    // Set a smaller value for the number of partitions for the __consumer_offsets topic
     // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
     serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
     serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
@@ -354,33 +402,35 @@ class TransactionsTest extends KafkaServerTestHarness {
     serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
     serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
     serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+    serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     serverProps
   }
 
-  private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = {
+  private def createReadCommittedConsumer(group: String = "group", maxPollRecords: Int = 500) = {
     val props = new Properties()
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
-    TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
       groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+    transactionalConsumers += consumer
+    consumer
   }
 
-  private def nonTransactionalConsumer(group: String = "group") = {
+  private def createReadUncommittedConsumer(group: String = "group") = {
     val props = new Properties()
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
       groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+    nonTransactionalConsumers += consumer
+    consumer
   }
 
-  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
-    val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
-      records.size == numRecords
-    }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
-    records
+  private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = {
+    val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+    transactionalProducers += producer
+    producer
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index cd0c74b..147e84a 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -117,9 +117,9 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
                                                              quotaManager: ReplicationQuotaManager) =
             new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
               override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-                val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
+                val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
                 val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
-                fetcherThread(new FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
+                fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
                   time, quotaManager))
               }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 7583d45..6c134ac 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -25,6 +25,7 @@ import kafka.server.BrokerTopicStats
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -99,7 +100,7 @@ object StressTestLog {
     @volatile var offset = 0
     override def work() {
       try {
-        log.read(offset, 1024, Some(offset+1)).records match {
+        log.read(offset, 1024, Some(offset+1), isolationLevel = IsolationLevel.READ_UNCOMMITTED).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
             val first = read.batches.iterator.next()
             require(first.lastOffset == offset, "We should either read nothing or the message we asked for.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 4c2eb27..6245e85 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1310,7 +1310,8 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
-    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
     EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
       .andReturn(records.buffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 54246c4..b5d7903 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -517,8 +517,8 @@ class TransactionStateManagerTest {
     EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset))
 
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
-    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true),
-      EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
     EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
       .andReturn(records.buffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index ee46341..3d19442 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,
           new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
 
-    def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
+    def readBatch(offset: Int) = log.readUncommitted(offset, 4096).records.batches.iterator.next()
 
     if (!brokerCompression.equals("producer")) {
       val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a6fe2e4..5b29471 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,10 +21,12 @@ import java.io._
 import java.util.Properties
 
 import kafka.common._
+import kafka.server.FetchDataInfo
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -105,10 +107,10 @@ class LogManagerTest {
 
     // there should be a log file, two indexes, and the leader epoch checkpoint
     assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes)
 
     try {
-      log.read(0, 1024)
+      log.readUncommitted(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
       case _: OffsetOutOfRangeException => // This is good.
@@ -154,9 +156,9 @@ class LogManagerTest {
     // there should be a log file, two indexes (the txn index is created lazily),
     // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments)
     assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes)
     try {
-      log.read(0, 1024)
+      log.readUncommitted(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
       case _: OffsetOutOfRangeException => // This is good.
@@ -302,7 +304,6 @@ class LogManagerTest {
     }
   }
 
-
   private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
     TestUtils.createLogManager(
       defaultConfig = logConfig,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6fcc7ae..7b67857 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -551,7 +551,7 @@ class LogTest {
     log.appendAsFollower(memoryRecords)
     log.flush()
 
-    val fetchedData = log.read(0, Int.MaxValue)
+    val fetchedData = log.readUncommitted(0, Int.MaxValue)
 
     val origIterator = memoryRecords.batches.iterator()
     for (batch <- fetchedData.records.batches.asScala) {
@@ -736,13 +736,14 @@ class LogTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0)
 
     for(i <- values.indices) {
-      val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+      val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next()
       assertEquals("Offset read should match order appended.", i, read.lastOffset)
       val actual = read.iterator.next()
       assertNull("Key should be null", actual.key)
       assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value)
     }
-    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(values.length, 100, None).records.batches.asScala.size)
+    assertEquals("Reading beyond the last message returns nothing.", 0,
+      log.readUncommitted(values.length, 100, None).records.batches.asScala.size)
   }
 
   /**
@@ -763,7 +764,7 @@ class LogTest {
       log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
-      val read = log.read(i, 100, None).records.records.iterator.next()
+      val read = log.readUncommitted(i, 100, None).records.records.iterator.next()
       assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
       assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
     }
@@ -790,7 +791,7 @@ class LogTest {
     log.logSegments.head.truncateTo(1)
 
     assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
-      log.read(1, 200, None).records.batches.iterator.next().lastOffset)
+      log.readUncommitted(1, 200, None).records.batches.iterator.next().lastOffset)
   }
 
   @Test
@@ -809,16 +810,16 @@ class LogTest {
     for (i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
       val reads = Seq(
-        log.read(i, 1, minOneMessage = true),
-        log.read(i, 100, minOneMessage = true),
-        log.read(i, 100, Some(10000), minOneMessage = true)
+        log.readUncommitted(i, 1, minOneMessage = true),
+        log.readUncommitted(i, 100, minOneMessage = true),
+        log.readUncommitted(i, 100, Some(10000), minOneMessage = true)
       ).map(_.records.records.iterator.next())
       reads.foreach { read =>
         assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
         assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
       }
 
-      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
+      assertEquals(Seq.empty, log.readUncommitted(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
     }
   }
 
@@ -836,14 +837,14 @@ class LogTest {
       log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
 
     for (i <- 50 until messageIds.max) {
-      assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
+      assertEquals(MemoryRecords.EMPTY, log.readUncommitted(i, 0).records)
 
       // we return an incomplete message instead of an empty one for the case below
       // we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is
       // larger than the fetch size
       // in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty
       // partition
-      val fetchInfo = log.read(i, 1)
+      val fetchInfo = log.readUncommitted(i, 1)
       assertTrue(fetchInfo.firstEntryIncomplete)
       assertTrue(fetchInfo.records.isInstanceOf[FileRecords])
       assertEquals(1, fetchInfo.records.sizeInBytes)
@@ -867,23 +868,25 @@ class LogTest {
       brokerTopicStats = brokerTopicStats, time = time)
     log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
-    assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
+    assertEquals("Reading at the log end offset should produce 0 byte read.", 0,
+      log.readUncommitted(1025, 1000).records.sizeInBytes)
 
     try {
-      log.read(0, 1000)
+      log.readUncommitted(0, 1000)
       fail("Reading below the log start offset should throw OffsetOutOfRangeException")
     } catch {
       case _: OffsetOutOfRangeException => // This is good.
     }
 
     try {
-      log.read(1026, 1000)
+      log.readUncommitted(1026, 1000)
       fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
     } catch {
       case _: OffsetOutOfRangeException => // This is good.
     }
 
-    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).records.sizeInBytes)
+    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0,
+      log.readUncommitted(1025, 1000, Some(1024)).records.sizeInBytes)
   }
 
   /**
@@ -906,7 +909,7 @@ class LogTest {
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
     for(i <- 0 until numMessages) {
-      val messages = log.read(offset, 1024*1024).records.batches
+      val messages = log.readUncommitted(offset, 1024*1024).records.batches
       val head = messages.iterator.next()
       assertEquals("Offsets not equal", offset, head.lastOffset)
 
@@ -917,7 +920,8 @@ class LogTest {
       assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp)
       offset = head.lastOffset + 1
     }
-    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
+    val lastRead = log.readUncommitted(startOffset = numMessages, maxLength = 1024*1024,
+      maxOffset = Some(numMessages + 1)).records
     assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
 
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
@@ -941,7 +945,7 @@ class LogTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)), leaderEpoch = 0)
 
-    def read(offset: Int) = log.read(offset, 4096).records.records
+    def read(offset: Int) = log.readUncommitted(offset, 4096).records.records
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset)
@@ -1152,7 +1156,7 @@ class LogTest {
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
     }
-    messages.foreach(log.appendAsFollower(_))
+    messages.foreach(log.appendAsFollower)
     val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
     assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
     assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
@@ -1190,7 +1194,7 @@ class LogTest {
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
+      assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -1215,7 +1219,8 @@ class LogTest {
     var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
       brokerTopicStats = brokerTopicStats, time = time)
     for(i <- 0 until numMessages)
-      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
+        timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -1226,11 +1231,10 @@ class LogTest {
     log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler,
       brokerTopicStats = brokerTopicStats, time = time)
     val segArray = log.logSegments.toArray
-    for (i <- 0 until segArray.size - 1) {
+    for (i <- segArray.indices.init) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
       assertEquals("The time index file size should be 0", 0, segArray(i).timeIndex.file.length)
     }
-
   }
 
   /**
@@ -1272,7 +1276,7 @@ class LogTest {
       brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
+      assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -1544,7 +1548,7 @@ class LogTest {
                       brokerTopicStats = brokerTopicStats,
                       time = time)
     log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
-    val head = log.read(0, 4096, None).records.records.iterator.next()
+    val head = log.readUncommitted(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
     assertTrue("Message payload should be null.", !head.hasValue)
   }
@@ -2032,7 +2036,7 @@ class LogTest {
 
     //Then leader epoch should be set on messages
     for (i <- records.indices) {
-      val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+      val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next()
       assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index e770106..2ee08a2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -42,7 +42,7 @@ class ReplicaManagerQuotasTest {
   val topicPartition1 = new TopicPartition("test-topic", 1)
   val topicPartition2 = new TopicPartition("test-topic", 2)
   val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100))
-  var replicaManager: ReplicaManager = null
+  var replicaManager: ReplicaManager = _
 
   @Test
   def shouldExcludeSubsequentThrottledPartitions(): Unit = {
@@ -61,7 +61,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
@@ -86,7 +87,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@@ -110,7 +112,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
@@ -134,7 +137,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
@@ -153,14 +157,16 @@ class ReplicaManagerQuotasTest {
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
 
     //if we ask for len 1 return a message
-    expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), anyObject())).andReturn(
+    expect(log.read(anyObject(), geq(1), anyObject(), anyObject(),
+      EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, record)
       )).anyTimes()
 
     //if we ask for len = 0, return 0 messages
-    expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), anyObject())).andReturn(
+    expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(),
+      EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.EMPTY

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6efd0a3..a33968a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.log.LogConfig
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import TestUtils.createBroker
+import kafka.utils.timer.MockTimer
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -165,26 +166,12 @@ class ReplicaManagerTest {
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
-      // Append a message.
-      rm.appendRecords(
-        timeout = 1000,
-        requiredAcks = -1,
-        internalTopicsAllowed = false,
-        isFromClient = true,
-        entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE,
-          new SimpleRecord("first message".getBytes()))),
-        responseCallback = produceCallback)
+      val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
+      appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
 
       // Fetch some messages
-      rm.fetchMessages(
-        timeout = 1000,
-        replicaId = -1,
-        fetchMinBytes = 100000,
-        fetchMaxBytes = Int.MaxValue,
-        hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        responseCallback = fetchCallback,
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+      fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), fetchCallback,
+        minBytes = 100000)
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
@@ -214,23 +201,32 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
     EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
     EasyMock.replay(metadataCache)
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+
+    val timer = new MockTimer
+    val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+      purgatoryName = "Produce", timer, reaperEnabled = false)
+    val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+      purgatoryName = "Fetch", timer, reaperEnabled = false)
+    val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
+      purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+
+    val replicaManager = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      metadataCache, Option(this.getClass.getName))
+      metadataCache, mockProducePurgatory, mockFetchPurgatory, mockDeleteRecordsPurgatory, Option(this.getClass.getName))
 
     try {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
 
-      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
+      val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
-      rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
+      replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) =
         responseStatus.values.foreach { status =>
@@ -245,13 +241,7 @@ class ReplicaManagerTest {
       for (sequence <- 0 until numRecords) {
         val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
-        rm.appendRecords(
-          timeout = 1000,
-          requiredAcks = -1,
-          internalTopicsAllowed = false,
-          isFromClient = true,
-          entriesPerPartition = Map(new TopicPartition(topic, 0) -> records),
-          responseCallback = produceCallback)
+        appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback)
       }
 
       var fetchCallbackFired = false
@@ -263,26 +253,22 @@ class ReplicaManagerTest {
         fetchCallbackFired = true
       }
 
-      def fetchMessages(fetchInfos: Seq[(TopicPartition, PartitionData)],
-                        isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
-        rm.fetchMessages(
-          timeout = 1000,
-          replicaId = 1,
-          fetchMinBytes = 0,
-          fetchMaxBytes = Int.MaxValue,
-          hardMaxBytesLimit = false,
-          fetchInfos = fetchInfos,
-          responseCallback = fetchCallback,
-          isolationLevel = isolationLevel)
-      }
-
       // fetch as follower to advance the high watermark
-      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+      fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // fetch should return empty since LSO should be stuck at 0
-      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        isolationLevel = IsolationLevel.READ_COMMITTED)
+      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
+      assertTrue(fetchCallbackFired)
+      assertEquals(Errors.NONE, fetchError)
+      assertTrue(fetchedRecords.batches.asScala.isEmpty)
+      fetchCallbackFired = false
+
+      // delayed fetch should timeout and return nothing
+      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000)
+      timer.advanceClock(1001)
 
       assertTrue(fetchCallbackFired)
       assertEquals(Errors.NONE, fetchError)
@@ -292,18 +278,13 @@ class ReplicaManagerTest {
       // now commit the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
       val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      rm.appendRecords(
-        timeout = 1000,
-        requiredAcks = -1,
-        internalTopicsAllowed = false,
-        isFromClient = false,
-        entriesPerPartition = Map(new TopicPartition(topic, 0) -> commitRecordBatch),
-        responseCallback = produceCallback)
+      appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> commitRecordBatch), produceCallback,
+        isFromClient = false)
 
       // the LSO has advanced, but the appended commit marker has not been replicated, so
       // none of the data from the transaction should be visible yet
-      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        isolationLevel = IsolationLevel.READ_COMMITTED)
+      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
 
       assertTrue(fetchCallbackFired)
       assertEquals(Errors.NONE, fetchError)
@@ -311,18 +292,18 @@ class ReplicaManagerTest {
       fetchCallbackFired = false
 
       // fetch as follower to advance the high watermark
-      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+      fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // now all of the records should be fetchable
-      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        isolationLevel = IsolationLevel.READ_COMMITTED)
+      fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
 
       assertTrue(fetchCallbackFired)
       assertEquals(Errors.NONE, fetchError)
       assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size)
     } finally {
-      rm.shutdown(checkpointHW = false)
+      replicaManager.shutdown(checkpointHW = false)
     }
   }
 
@@ -349,10 +330,10 @@ class ReplicaManagerTest {
     try {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
-      
+
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
-      
+
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
@@ -361,17 +342,13 @@ class ReplicaManagerTest {
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
-      
+
       // Append a couple of messages.
-      for(i <- 1 to 2)
-        rm.appendRecords(
-          timeout = 1000,
-          requiredAcks = -1,
-          internalTopicsAllowed = false,
-          isFromClient = true,
-          entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message %d".format(i).getBytes)),
-          responseCallback = produceCallback)
-      
+      for(i <- 1 to 2) {
+        val records = TestUtils.singletonRecords(s"message $i".getBytes)
+        appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
+      }
+
       var fetchCallbackFired = false
       var fetchError = Errors.NONE
       var fetchedRecords: Records = null
@@ -380,35 +357,16 @@ class ReplicaManagerTest {
         fetchedRecords = responseStatus.map(_._2).head.records
         fetchCallbackFired = true
       }
-      
+
       // Fetch a message above the high watermark as a follower
-      rm.fetchMessages(
-        timeout = 1000,
-        replicaId = 1,
-        fetchMinBytes = 0,
-        fetchMaxBytes = Int.MaxValue,
-        hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
-        responseCallback = fetchCallback,
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-        
-      
+      fetchAsFollower(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE, fetchError)
       assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext)
       fetchCallbackFired = false
-      
+
       // Fetch a message above the high watermark as a consumer
-      rm.fetchMessages(
-        timeout = 1000,
-        replicaId = -1,
-        fetchMinBytes = 0,
-        fetchMaxBytes = Int.MaxValue,
-        hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
-        responseCallback = fetchCallback,
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-          
+      fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE, fetchError)
       assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
@@ -416,4 +374,51 @@ class ReplicaManagerTest {
       rm.shutdown(checkpointHW = false)
     }
   }
+
+  private def appendRecords(replicaManager: ReplicaManager,
+                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
+                            responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+                            isFromClient: Boolean = true): Unit = {
+    replicaManager.appendRecords(
+      timeout = 1000,
+      requiredAcks = -1,
+      internalTopicsAllowed = false,
+      isFromClient = isFromClient,
+      entriesPerPartition = entriesPerPartition,
+      responseCallback = responseCallback)
+  }
+
+  private def fetchAsConsumer(replicaManager: ReplicaManager,
+                              fetchInfos: Seq[(TopicPartition, PartitionData)],
+                              fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                              minBytes: Int = 0,
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
+    fetchMessages(replicaManager, replicaId = -1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+  }
+
+  private def fetchAsFollower(replicaManager: ReplicaManager,
+                              fetchInfos: Seq[(TopicPartition, PartitionData)],
+                              fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                              minBytes: Int = 0,
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
+    fetchMessages(replicaManager, replicaId = 1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+  }
+
+  private def fetchMessages(replicaManager: ReplicaManager,
+                            replicaId: Int,
+                            fetchInfos: Seq[(TopicPartition, PartitionData)],
+                            fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                            minBytes: Int,
+                            isolationLevel: IsolationLevel): Unit = {
+    replicaManager.fetchMessages(
+      timeout = 1000,
+      replicaId = replicaId,
+      fetchMinBytes = minBytes,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      fetchInfos = fetchInfos,
+      responseCallback = fetchCallback,
+      isolationLevel = isolationLevel)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ac851d8..dad4b78 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -62,7 +62,7 @@ class SimpleFetchTest {
 
   val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize))
 
-  var replicaManager: ReplicaManager = null
+  var replicaManager: ReplicaManager = _
 
   @Before
   def setUp() {
@@ -80,13 +80,23 @@ class SimpleFetchTest {
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
-      FetchDataInfo(
+    EasyMock.expect(log.read(
+      startOffset = 0,
+      maxLength = fetchSize,
+      maxOffset = Some(partitionHW),
+      minOneMessage = true,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED))
+      .andReturn(FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
       )).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, None, true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
-      FetchDataInfo(
+    EasyMock.expect(log.read(
+      startOffset = 0,
+      maxLength = fetchSize,
+      maxOffset = None,
+      minOneMessage = true,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED))
+      .andReturn(FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
       )).anyTimes()
@@ -162,7 +172,8 @@ class SimpleFetchTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota).find(_._1 == topicPartition)
+      quota = UnboundedQuota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition)
     val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
     assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
       new SimpleRecord(firstReadRecord))
@@ -174,7 +185,8 @@ class SimpleFetchTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota).find(_._1 == topicPartition)
+      quota = UnboundedQuota,
+      isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition)
 
     val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
     assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index aae58cc..60b18d2 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -833,7 +833,8 @@ object TestUtils extends Logging {
   /**
    * Wait until the given condition is true or throw an exception if the given wait time elapses.
    */
-  def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
+  def waitUntilTrue(condition: () => Boolean, msg: => String,
+                    waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
     val startTime = System.currentTimeMillis()
     while (true) {
       if (condition())