You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/04 23:37:01 UTC
kafka git commit: KAFKA-5355;
Test cases to ensure isolation level propagated in delayed fetch
Repository: kafka
Updated Branches:
refs/heads/trunk c7bc8f7d8 -> 3557f097b
KAFKA-5355; Test cases to ensure isolation level propagated in delayed fetch
Include a few logging improvements.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3230 from hachikuji/KAFKA-5355-TESTS
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3557f097
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3557f097
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3557f097
Branch: refs/heads/trunk
Commit: 3557f097b81edd6518100d79b442223fadf9f81f
Parents: c7bc8f7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 5 00:36:16 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 5 00:36:26 2017 +0100
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 19 +-
.../producer/internals/TransactionManager.java | 8 +
.../common/requests/TxnOffsetCommitRequest.java | 4 +
.../group/GroupMetadataManager.scala | 5 +-
core/src/main/scala/kafka/log/Log.scala | 9 +-
.../main/scala/kafka/server/DelayedFetch.scala | 3 +-
.../scala/kafka/server/ReplicaManager.scala | 40 +++-
.../kafka/api/TransactionsTest.scala | 108 +++++++---
.../ReplicaFetcherThreadFatalErrorTest.scala | 4 +-
.../test/scala/other/kafka/StressTestLog.scala | 3 +-
.../group/GroupMetadataManagerTest.scala | 3 +-
.../TransactionStateManagerTest.scala | 4 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 11 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 56 +++---
.../kafka/server/ReplicaManagerQuotasTest.scala | 20 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 197 ++++++++++---------
.../unit/kafka/server/SimpleFetchTest.scala | 26 ++-
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
19 files changed, 323 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9be8aa3..e73ff4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -198,7 +198,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
- log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
+ log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(),
+ fetchTarget);
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
@@ -221,9 +222,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
long fetchOffset = request.fetchData().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
- log.debug("Fetch at offset {} for partition {} returned fetch data {}", fetchOffset,
- partition, fetchData);
-
+ log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
+ isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
@@ -782,8 +782,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
long position = this.subscriptions.position(partition);
- fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize));
- log.debug("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
+ fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
+ this.fetchSize));
+ log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
+ partition, position, node);
} else {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
}
@@ -1038,8 +1040,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
} else if (isBatchAborted(currentBatch)) {
- log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}",
- producerId, currentBatch.baseOffset(), partition);
+ log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
+ "offsets {} to {}",
+ partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
nextFetchOffset = currentBatch.nextOffset();
continue;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index dad6b5d..c081b23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -212,6 +212,7 @@ public class TransactionManager {
throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
"active transaction");
+ log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
@@ -226,6 +227,7 @@ public class TransactionManager {
if (partitionsInTransaction.contains(topicPartition))
return;
+ log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
@@ -749,6 +751,7 @@ public class TransactionManager {
abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
} else {
Set<TopicPartition> addedPartitions = errors.keySet();
+ log.debug("{}Successfully added partitions {} to transaction", logPrefix, addedPartitions);
partitionsInTransaction.addAll(addedPartitions);
pendingPartitionsInTransaction.removeAll(addedPartitions);
transactionStarted = true;
@@ -886,6 +889,9 @@ public class TransactionManager {
Errors error = addOffsetsToTxnResponse.error();
if (error == Errors.NONE) {
+ log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
+ builder.consumerGroupId());
+
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
transactionStarted = true;
@@ -946,6 +952,8 @@ public class TransactionManager {
TopicPartition topicPartition = entry.getKey();
Errors error = entry.getValue();
if (error == Errors.NONE) {
+ log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix,
+ builder.offsets(), builder.consumerGroupId());
pendingTxnOffsetCommits.remove(topicPartition);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 2ea8ecf..20522af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -59,6 +59,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
return consumerGroupId;
}
+ public Map<TopicPartition, CommittedOffset> offsets() {
+ return offsets;
+ }
+
@Override
public TxnOffsetCommitRequest build(short version) {
return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index b23514a..db3d936 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -484,9 +484,8 @@ class GroupMetadataManager(brokerId: Int,
val removedGroups = mutable.Set[String]()
while (currOffset < highWaterMark && !shuttingDown.get()) {
- val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true,
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-
+ val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None,
+ minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
val memRecords = fetchDataInfo.records match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b9968a2..5522508 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -830,6 +830,11 @@ class Log(@volatile var dir: File,
}
}
+ private[log] def readUncommitted(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None,
+ minOneMessage: Boolean = false): FetchDataInfo = {
+ read(startOffset, maxLength, maxOffset, minOneMessage, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+ }
+
/**
* Read messages from the log.
*
@@ -849,7 +854,7 @@ class Log(@volatile var dir: File,
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
- isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = {
+ isolationLevel: IsolationLevel): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// Because we don't use lock for reading, the synchronization is a little bit tricky.
@@ -1001,7 +1006,7 @@ class Log(@volatile var dir: File,
*/
def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
try {
- val fetchDataInfo = read(offset, 1)
+ val fetchDataInfo = readUncommitted(offset, 1)
fetchDataInfo.fetchOffsetMetadata
} catch {
case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index fb89800..c5cdf57 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -153,8 +153,7 @@ class DelayedFetch(delayMs: Long,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
quota = quota,
- isolationLevel = isolationLevel
- )
+ isolationLevel = isolationLevel)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5e1c9c1..a8d3e94 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -129,7 +129,36 @@ class ReplicaManager(val config: KafkaConfig,
quotaManager: ReplicationQuotaManager,
val brokerTopicStats: BrokerTopicStats,
val metadataCache: MetadataCache,
- threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+ val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+ val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+ val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+ threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
+
+ def this(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ zkUtils: ZkUtils,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManager: ReplicationQuotaManager,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: MetadataCache,
+ threadNamePrefix: Option[String] = None) {
+ this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
+ quotaManager, brokerTopicStats, metadataCache,
+ DelayedOperationPurgatory[DelayedProduce](
+ purgatoryName = "Produce", brokerId = config.brokerId,
+ purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+ DelayedOperationPurgatory[DelayedFetch](
+ purgatoryName = "Fetch", brokerId = config.brokerId,
+ purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+ DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+ purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+ threadNamePrefix)
+ }
+
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val localBrokerId = config.brokerId
@@ -146,13 +175,6 @@ class ReplicaManager(val config: KafkaConfig,
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
- val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
- purgatoryName = "Produce", brokerId = localBrokerId, purgeInterval = config.producerPurgatoryPurgeIntervalRequests)
- val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
- purgatoryName = "Fetch", brokerId = localBrokerId, purgeInterval = config.fetchPurgatoryPurgeIntervalRequests)
- val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
- purgatoryName = "DeleteRecords", brokerId = localBrokerId, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests)
-
val leaderCount = newGauge(
"LeaderCount",
new Gauge[Int] {
@@ -641,7 +663,7 @@ class ReplicaManager(val config: KafkaConfig,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
- isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Seq[(TopicPartition, LogReadResult)] = {
+ isolationLevel: IsolationLevel): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 9aceec8..4593d9c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
-import org.apache.kafka.clients.producer.KafkaProducer
+import kafka.utils.TestUtils.consumeRecords
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.protocol.SecurityProtocol
@@ -32,7 +33,7 @@ import org.junit.{After, Before, Test}
import org.junit.Assert._
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
class TransactionsTest extends KafkaServerTestHarness {
@@ -49,7 +50,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
+ TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
}
@Before
@@ -62,11 +63,11 @@ class TransactionsTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
for (_ <- 0 until transactionalProducerCount)
- transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer", servers)
+ createTransactionalProducer("transactional-producer")
for (_ <- 0 until transactionalConsumerCount)
- transactionalConsumers += transactionalConsumer("transactional-group")
+ createReadCommittedConsumer("transactional-group")
for (_ <- 0 until nonTransactionalConsumerCount)
- nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group")
+ createReadUncommittedConsumer("non-transactional-group")
}
@After
@@ -79,9 +80,9 @@ class TransactionsTest extends KafkaServerTestHarness {
@Test
def testBasicTransactions() = {
- val producer = transactionalProducers(0)
- val consumer = transactionalConsumers(0)
- val unCommittedConsumer = nonTransactionalConsumers(0)
+ val producer = transactionalProducers.head
+ val consumer = transactionalConsumers.head
+ val unCommittedConsumer = nonTransactionalConsumers.head
producer.initTransactions()
@@ -99,12 +100,12 @@ class TransactionsTest extends KafkaServerTestHarness {
consumer.subscribe(List(topic1, topic2).asJava)
unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
- val records = pollUntilExactlyNumRecords(consumer, 2)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
- val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+ val allRecords = consumeRecords(unCommittedConsumer, 4)
val expectedValues = List("1", "2", "3", "4").toSet
allRecords.foreach { record =>
assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
@@ -112,6 +113,53 @@ class TransactionsTest extends KafkaServerTestHarness {
}
@Test
+ def testReadCommittedConsumerShouldNotSeeUndecidedData(): Unit = {
+ val producer1 = transactionalProducers.head
+ val producer2 = createTransactionalProducer("other")
+ val readCommittedConsumer = transactionalConsumers.head
+ val readUncommittedConsumer = nonTransactionalConsumers.head
+
+ producer1.initTransactions()
+ producer2.initTransactions()
+
+ producer1.beginTransaction()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes))
+ producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "1".getBytes))
+ producer2.flush()
+
+ producer1.send(new ProducerRecord(topic1, 0, "a".getBytes, "1".getBytes))
+ producer1.send(new ProducerRecord(topic1, 0, "b".getBytes, "2".getBytes))
+ producer1.send(new ProducerRecord(topic2, 0, "c".getBytes, "3".getBytes))
+ producer1.send(new ProducerRecord(topic2, 0, "d".getBytes, "4".getBytes))
+ producer1.flush()
+
+ producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes))
+ producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "2".getBytes))
+ producer2.commitTransaction()
+
+ // ensure the records are visible to the read uncommitted consumer
+ readUncommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+ consumeRecords(readUncommittedConsumer, 8)
+ readUncommittedConsumer.unsubscribe()
+
+ // we should only see the first two records which come before the undecided second transaction
+ readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+ val records = consumeRecords(readCommittedConsumer, 2)
+ records.foreach { record =>
+ assertEquals("x", new String(record.key))
+ assertEquals("1", new String(record.value))
+ }
+
+ // even if we seek to the end, we should not be able to see the undecided data
+ assertEquals(2, readCommittedConsumer.assignment.size)
+ readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
+ readCommittedConsumer.assignment.asScala.foreach { tp =>
+ assertEquals(1L, readCommittedConsumer.position(tp))
+ }
+ }
+
+ @Test
def testSendOffsets() = {
// The basic plan for the test is as follows:
// 1. Seed topic1 with 1000 unique, numbered, messages.
@@ -129,7 +177,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val producer = transactionalProducers(0)
- val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+ val consumer = createReadCommittedConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
consumer.subscribe(List(topic1).asJava)
producer.initTransactions()
@@ -208,7 +256,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer2.commitTransaction() // ok
- val records = pollUntilExactlyNumRecords(consumer, 2)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
@@ -246,7 +294,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer2.commitTransaction() // ok
- val records = pollUntilExactlyNumRecords(consumer, 2)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
@@ -290,7 +338,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer2.commitTransaction() // ok
- val records = pollUntilExactlyNumRecords(consumer, 2)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
@@ -336,7 +384,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer2.commitTransaction() // ok
- val records = pollUntilExactlyNumRecords(consumer, 2)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
@@ -345,7 +393,7 @@ class TransactionsTest extends KafkaServerTestHarness {
private def serverProps() = {
val serverProps = new Properties()
serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
- // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+ // Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
@@ -354,33 +402,35 @@ class TransactionsTest extends KafkaServerTestHarness {
serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+ serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps
}
- private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = {
+ private def createReadCommittedConsumer(group: String = "group", maxPollRecords: Int = 500) = {
val props = new Properties()
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
- TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+ transactionalConsumers += consumer
+ consumer
}
- private def nonTransactionalConsumer(group: String = "group") = {
+ private def createReadUncommittedConsumer(group: String = "group") = {
val props = new Properties()
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+ nonTransactionalConsumers += consumer
+ consumer
}
- private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
- val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
- TestUtils.waitUntilTrue(() => {
- records ++= consumer.poll(50).asScala
- records.size == numRecords
- }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
- records
+ private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+ transactionalProducers += producer
+ producer
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index cd0c74b..147e84a 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -117,9 +117,9 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
quotaManager: ReplicationQuotaManager) =
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
- val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
+ val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
- fetcherThread(new FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
+ fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
time, quotaManager))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 7583d45..6c134ac 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -25,6 +25,7 @@ import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.utils.Utils
/**
@@ -99,7 +100,7 @@ object StressTestLog {
@volatile var offset = 0
override def work() {
try {
- log.read(offset, 1024, Some(offset+1)).records match {
+ log.read(offset, 1024, Some(offset+1), isolationLevel = IsolationLevel.READ_UNCOMMITTED).records match {
case read: FileRecords if read.sizeInBytes > 0 => {
val first = read.batches.iterator.next()
require(first.lastOffset == offset, "We should either read nothing or the message we asked for.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 4c2eb27..6245e85 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1310,7 +1310,8 @@ class GroupMetadataManagerTest {
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
- EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+ EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+ EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
.andReturn(records.buffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 54246c4..b5d7903 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -517,8 +517,8 @@ class TransactionStateManagerTest {
EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset))
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
- EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true),
- EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+ EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+ EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
.andReturn(records.buffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index ee46341..3d19442 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,
new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
- def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
+ def readBatch(offset: Int) = log.readUncommitted(offset, 4096).records.batches.iterator.next()
if (!brokerCompression.equals("producer")) {
val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a6fe2e4..5b29471 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,10 +21,12 @@ import java.io._
import java.util.Properties
import kafka.common._
+import kafka.server.FetchDataInfo
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -105,10 +107,10 @@ class LogManagerTest {
// there should be a log file, two indexes, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes)
try {
- log.read(0, 1024)
+ log.readUncommitted(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
case _: OffsetOutOfRangeException => // This is good.
@@ -154,9 +156,9 @@ class LogManagerTest {
// there should be a log file, two indexes (the txn index is created lazily),
// the leader epoch checkpoint and two pid mapping files (one for the active and previous segments)
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes)
try {
- log.read(0, 1024)
+ log.readUncommitted(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
case _: OffsetOutOfRangeException => // This is good.
@@ -302,7 +304,6 @@ class LogManagerTest {
}
}
-
private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
TestUtils.createLogManager(
defaultConfig = logConfig,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6fcc7ae..7b67857 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -551,7 +551,7 @@ class LogTest {
log.appendAsFollower(memoryRecords)
log.flush()
- val fetchedData = log.read(0, Int.MaxValue)
+ val fetchedData = log.readUncommitted(0, Int.MaxValue)
val origIterator = memoryRecords.batches.iterator()
for (batch <- fetchedData.records.batches.asScala) {
@@ -736,13 +736,14 @@ class LogTest {
log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0)
for(i <- values.indices) {
- val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+ val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next()
assertEquals("Offset read should match order appended.", i, read.lastOffset)
val actual = read.iterator.next()
assertNull("Key should be null", actual.key)
assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value)
}
- assertEquals("Reading beyond the last message returns nothing.", 0, log.read(values.length, 100, None).records.batches.asScala.size)
+ assertEquals("Reading beyond the last message returns nothing.", 0,
+ log.readUncommitted(values.length, 100, None).records.batches.asScala.size)
}
/**
@@ -763,7 +764,7 @@ class LogTest {
log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
- val read = log.read(i, 100, None).records.records.iterator.next()
+ val read = log.readUncommitted(i, 100, None).records.records.iterator.next()
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
}
@@ -790,7 +791,7 @@ class LogTest {
log.logSegments.head.truncateTo(1)
assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
- log.read(1, 200, None).records.batches.iterator.next().lastOffset)
+ log.readUncommitted(1, 200, None).records.batches.iterator.next().lastOffset)
}
@Test
@@ -809,16 +810,16 @@ class LogTest {
for (i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
val reads = Seq(
- log.read(i, 1, minOneMessage = true),
- log.read(i, 100, minOneMessage = true),
- log.read(i, 100, Some(10000), minOneMessage = true)
+ log.readUncommitted(i, 1, minOneMessage = true),
+ log.readUncommitted(i, 100, minOneMessage = true),
+ log.readUncommitted(i, 100, Some(10000), minOneMessage = true)
).map(_.records.records.iterator.next())
reads.foreach { read =>
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
}
- assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
+ assertEquals(Seq.empty, log.readUncommitted(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
}
}
@@ -836,14 +837,14 @@ class LogTest {
log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
for (i <- 50 until messageIds.max) {
- assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
+ assertEquals(MemoryRecords.EMPTY, log.readUncommitted(i, 0).records)
// we return an incomplete message instead of an empty one for the case below
// we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is
// larger than the fetch size
// in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty
// partition
- val fetchInfo = log.read(i, 1)
+ val fetchInfo = log.readUncommitted(i, 1)
assertTrue(fetchInfo.firstEntryIncomplete)
assertTrue(fetchInfo.records.isInstanceOf[FileRecords])
assertEquals(1, fetchInfo.records.sizeInBytes)
@@ -867,23 +868,25 @@ class LogTest {
brokerTopicStats = brokerTopicStats, time = time)
log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
- assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
+ assertEquals("Reading at the log end offset should produce 0 byte read.", 0,
+ log.readUncommitted(1025, 1000).records.sizeInBytes)
try {
- log.read(0, 1000)
+ log.readUncommitted(0, 1000)
fail("Reading below the log start offset should throw OffsetOutOfRangeException")
} catch {
case _: OffsetOutOfRangeException => // This is good.
}
try {
- log.read(1026, 1000)
+ log.readUncommitted(1026, 1000)
fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
} catch {
case _: OffsetOutOfRangeException => // This is good.
}
- assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).records.sizeInBytes)
+ assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0,
+ log.readUncommitted(1025, 1000, Some(1024)).records.sizeInBytes)
}
/**
@@ -906,7 +909,7 @@ class LogTest {
/* do successive reads to ensure all our messages are there */
var offset = 0L
for(i <- 0 until numMessages) {
- val messages = log.read(offset, 1024*1024).records.batches
+ val messages = log.readUncommitted(offset, 1024*1024).records.batches
val head = messages.iterator.next()
assertEquals("Offsets not equal", offset, head.lastOffset)
@@ -917,7 +920,8 @@ class LogTest {
assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp)
offset = head.lastOffset + 1
}
- val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
+ val lastRead = log.readUncommitted(startOffset = numMessages, maxLength = 1024*1024,
+ maxOffset = Some(numMessages + 1)).records
assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
@@ -941,7 +945,7 @@ class LogTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)), leaderEpoch = 0)
- def read(offset: Int) = log.read(offset, 4096).records.records
+ def read(offset: Int) = log.readUncommitted(offset, 4096).records.records
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset)
@@ -1152,7 +1156,7 @@ class LogTest {
val messages = (0 until numMessages).map { i =>
MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
}
- messages.foreach(log.appendAsFollower(_))
+ messages.foreach(log.appendAsFollower)
val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
@@ -1190,7 +1194,7 @@ class LogTest {
assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
+ assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -1215,7 +1219,8 @@ class LogTest {
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until numMessages)
- log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
+ log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
+ timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
log.close()
@@ -1226,11 +1231,10 @@ class LogTest {
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val segArray = log.logSegments.toArray
- for (i <- 0 until segArray.size - 1) {
+ for (i <- segArray.indices.init) {
assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
assertEquals("The time index file size should be 0", 0, segArray(i).timeIndex.file.length)
}
-
}
/**
@@ -1272,7 +1276,7 @@ class LogTest {
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
+ assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -1544,7 +1548,7 @@ class LogTest {
brokerTopicStats = brokerTopicStats,
time = time)
log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
- val head = log.read(0, 4096, None).records.records.iterator.next()
+ val head = log.readUncommitted(0, 4096, None).records.records.iterator.next()
assertEquals(0, head.offset)
assertTrue("Message payload should be null.", !head.hasValue)
}
@@ -2032,7 +2036,7 @@ class LogTest {
//Then leader epoch should be set on messages
for (i <- records.indices) {
- val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+ val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next()
assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index e770106..2ee08a2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -42,7 +42,7 @@ class ReplicaManagerQuotasTest {
val topicPartition1 = new TopicPartition("test-topic", 1)
val topicPartition2 = new TopicPartition("test-topic", 2)
val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100))
- var replicaManager: ReplicaManager = null
+ var replicaManager: ReplicaManager = _
@Test
def shouldExcludeSubsequentThrottledPartitions(): Unit = {
@@ -61,7 +61,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = quota)
+ quota = quota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@@ -86,7 +87,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = quota)
+ quota = quota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@@ -110,7 +112,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = quota)
+ quota = quota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
@@ -134,7 +137,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = quota)
+ quota = quota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@@ -153,14 +157,16 @@ class ReplicaManagerQuotasTest {
expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
//if we ask for len 1 return a message
- expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), anyObject())).andReturn(
+ expect(log.read(anyObject(), geq(1), anyObject(), anyObject(),
+ EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn(
FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.withRecords(CompressionType.NONE, record)
)).anyTimes()
//if we ask for len = 0, return 0 messages
- expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), anyObject())).andReturn(
+ expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(),
+ EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn(
FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.EMPTY
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6efd0a3..a33968a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.log.LogConfig
import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
import TestUtils.createBroker
+import kafka.utils.timer.MockTimer
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
@@ -165,26 +166,12 @@ class ReplicaManagerTest {
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
- // Append a message.
- rm.appendRecords(
- timeout = 1000,
- requiredAcks = -1,
- internalTopicsAllowed = false,
- isFromClient = true,
- entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE,
- new SimpleRecord("first message".getBytes()))),
- responseCallback = produceCallback)
+ val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
+ appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
// Fetch some messages
- rm.fetchMessages(
- timeout = 1000,
- replicaId = -1,
- fetchMinBytes = 100000,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
- responseCallback = fetchCallback,
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+ fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), fetchCallback,
+ minBytes = 100000)
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
@@ -214,23 +201,32 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
EasyMock.replay(metadataCache)
- val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+
+ val timer = new MockTimer
+ val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+ purgatoryName = "Produce", timer, reaperEnabled = false)
+ val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+ purgatoryName = "Fetch", timer, reaperEnabled = false)
+ val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+
+ val replicaManager = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
- metadataCache, Option(this.getClass.getName))
+ metadataCache, mockProducePurgatory, mockFetchPurgatory, mockDeleteRecordsPurgatory, Option(this.getClass.getName))
try {
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
- val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
+ val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
partition.getOrCreateReplica(0)
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
- rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
+ replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) =
responseStatus.values.foreach { status =>
@@ -245,13 +241,7 @@ class ReplicaManagerTest {
for (sequence <- 0 until numRecords) {
val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence,
new SimpleRecord(s"message $sequence".getBytes))
- rm.appendRecords(
- timeout = 1000,
- requiredAcks = -1,
- internalTopicsAllowed = false,
- isFromClient = true,
- entriesPerPartition = Map(new TopicPartition(topic, 0) -> records),
- responseCallback = produceCallback)
+ appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback)
}
var fetchCallbackFired = false
@@ -263,26 +253,22 @@ class ReplicaManagerTest {
fetchCallbackFired = true
}
- def fetchMessages(fetchInfos: Seq[(TopicPartition, PartitionData)],
- isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
- rm.fetchMessages(
- timeout = 1000,
- replicaId = 1,
- fetchMinBytes = 0,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- fetchInfos = fetchInfos,
- responseCallback = fetchCallback,
- isolationLevel = isolationLevel)
- }
-
// fetch as follower to advance the high watermark
- fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+ fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
// fetch should return empty since LSO should be stuck at 0
- fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
- isolationLevel = IsolationLevel.READ_COMMITTED)
+ fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
+ assertTrue(fetchCallbackFired)
+ assertEquals(Errors.NONE, fetchError)
+ assertTrue(fetchedRecords.batches.asScala.isEmpty)
+ fetchCallbackFired = false
+
+ // delayed fetch should timeout and return nothing
+ fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000)
+ timer.advanceClock(1001)
assertTrue(fetchCallbackFired)
assertEquals(Errors.NONE, fetchError)
@@ -292,18 +278,13 @@ class ReplicaManagerTest {
// now commit the transaction
val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
- rm.appendRecords(
- timeout = 1000,
- requiredAcks = -1,
- internalTopicsAllowed = false,
- isFromClient = false,
- entriesPerPartition = Map(new TopicPartition(topic, 0) -> commitRecordBatch),
- responseCallback = produceCallback)
+ appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> commitRecordBatch), produceCallback,
+ isFromClient = false)
// the LSO has advanced, but the appended commit marker has not been replicated, so
// none of the data from the transaction should be visible yet
- fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
- isolationLevel = IsolationLevel.READ_COMMITTED)
+ fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
assertTrue(fetchCallbackFired)
assertEquals(Errors.NONE, fetchError)
@@ -311,18 +292,18 @@ class ReplicaManagerTest {
fetchCallbackFired = false
// fetch as follower to advance the high watermark
- fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+ fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
// now all of the records should be fetchable
- fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
- isolationLevel = IsolationLevel.READ_COMMITTED)
+ fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+ fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED)
assertTrue(fetchCallbackFired)
assertEquals(Errors.NONE, fetchError)
assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size)
} finally {
- rm.shutdown(checkpointHW = false)
+ replicaManager.shutdown(checkpointHW = false)
}
}
@@ -349,10 +330,10 @@ class ReplicaManagerTest {
try {
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
-
+
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
partition.getOrCreateReplica(0)
-
+
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
@@ -361,17 +342,13 @@ class ReplicaManagerTest {
rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
-
+
// Append a couple of messages.
- for(i <- 1 to 2)
- rm.appendRecords(
- timeout = 1000,
- requiredAcks = -1,
- internalTopicsAllowed = false,
- isFromClient = true,
- entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message %d".format(i).getBytes)),
- responseCallback = produceCallback)
-
+ for(i <- 1 to 2) {
+ val records = TestUtils.singletonRecords(s"message $i".getBytes)
+ appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback)
+ }
+
var fetchCallbackFired = false
var fetchError = Errors.NONE
var fetchedRecords: Records = null
@@ -380,35 +357,16 @@ class ReplicaManagerTest {
fetchedRecords = responseStatus.map(_._2).head.records
fetchCallbackFired = true
}
-
+
// Fetch a message above the high watermark as a follower
- rm.fetchMessages(
- timeout = 1000,
- replicaId = 1,
- fetchMinBytes = 0,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
- responseCallback = fetchCallback,
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-
-
+ fetchAsFollower(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
assertTrue(fetchCallbackFired)
assertEquals("Should not give an exception", Errors.NONE, fetchError)
assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext)
fetchCallbackFired = false
-
+
// Fetch a message above the high watermark as a consumer
- rm.fetchMessages(
- timeout = 1000,
- replicaId = -1,
- fetchMinBytes = 0,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
- responseCallback = fetchCallback,
- isolationLevel = IsolationLevel.READ_UNCOMMITTED)
-
+ fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback)
assertTrue(fetchCallbackFired)
assertEquals("Should not give an exception", Errors.NONE, fetchError)
assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
@@ -416,4 +374,51 @@ class ReplicaManagerTest {
rm.shutdown(checkpointHW = false)
}
}
+
+ private def appendRecords(replicaManager: ReplicaManager,
+ entriesPerPartition: Map[TopicPartition, MemoryRecords],
+ responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+ isFromClient: Boolean = true): Unit = {
+ replicaManager.appendRecords(
+ timeout = 1000,
+ requiredAcks = -1,
+ internalTopicsAllowed = false,
+ isFromClient = isFromClient,
+ entriesPerPartition = entriesPerPartition,
+ responseCallback = responseCallback)
+ }
+
+ private def fetchAsConsumer(replicaManager: ReplicaManager,
+ fetchInfos: Seq[(TopicPartition, PartitionData)],
+ fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+ minBytes: Int = 0,
+ isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
+ fetchMessages(replicaManager, replicaId = -1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+ }
+
+ private def fetchAsFollower(replicaManager: ReplicaManager,
+ fetchInfos: Seq[(TopicPartition, PartitionData)],
+ fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+ minBytes: Int = 0,
+ isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
+ fetchMessages(replicaManager, replicaId = 1, fetchInfos, fetchCallback, minBytes, isolationLevel)
+ }
+
+ private def fetchMessages(replicaManager: ReplicaManager,
+ replicaId: Int,
+ fetchInfos: Seq[(TopicPartition, PartitionData)],
+ fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+ minBytes: Int,
+ isolationLevel: IsolationLevel): Unit = {
+ replicaManager.fetchMessages(
+ timeout = 1000,
+ replicaId = replicaId,
+ fetchMinBytes = minBytes,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ fetchInfos = fetchInfos,
+ responseCallback = fetchCallback,
+ isolationLevel = isolationLevel)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ac851d8..dad4b78 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -62,7 +62,7 @@ class SimpleFetchTest {
val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize))
- var replicaManager: ReplicaManager = null
+ var replicaManager: ReplicaManager = _
@Before
def setUp() {
@@ -80,13 +80,23 @@ class SimpleFetchTest {
EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
- EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
- FetchDataInfo(
+ EasyMock.expect(log.read(
+ startOffset = 0,
+ maxLength = fetchSize,
+ maxOffset = Some(partitionHW),
+ minOneMessage = true,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED))
+ .andReturn(FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
)).anyTimes()
- EasyMock.expect(log.read(0, fetchSize, None, true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
- FetchDataInfo(
+ EasyMock.expect(log.read(
+ startOffset = 0,
+ maxLength = fetchSize,
+ maxOffset = None,
+ minOneMessage = true,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED))
+ .andReturn(FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
)).anyTimes()
@@ -162,7 +172,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = UnboundedQuota).find(_._1 == topicPartition)
+ quota = UnboundedQuota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition)
val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
new SimpleRecord(firstReadRecord))
@@ -174,7 +185,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
- quota = UnboundedQuota).find(_._1 == topicPartition)
+ quota = UnboundedQuota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition)
val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index aae58cc..60b18d2 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -833,7 +833,8 @@ object TestUtils extends Logging {
/**
* Wait until the given condition is true or throw an exception if the given wait time elapses.
*/
- def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
+ def waitUntilTrue(condition: () => Boolean, msg: => String,
+ waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())