You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:11 UTC
[1/30] git commit: Merge branch '0.8' into trunk
Updated Branches:
refs/heads/trunk 739b2df68 -> 32dae955d
Merge branch '0.8' into trunk
Conflicts:
core/src/main/scala/kafka/log/Log.scala
core/src/test/scala/unit/kafka/admin/AdminTest.scala
core/src/test/scala/unit/kafka/log/LogTest.scala
core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32dae955
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32dae955
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32dae955
Branch: refs/heads/trunk
Commit: 32dae955d5e2e2dd45bddb628cb07c874241d856
Parents: 739b2df 85ec044
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Dec 18 09:43:41 2012 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Dec 18 09:43:41 2012 -0800
----------------------------------------------------------------------
bin/kafka-producer-shell.sh | 17 --
bin/windows/kafka-server-stop.bat | 18 ++
bin/windows/zookeeper-server-stop.bat | 17 ++
.../src/main/java/kafka/etl/KafkaETLContext.java | 2 +-
core/src/main/scala/kafka/admin/AdminUtils.scala | 5 +-
.../scala/kafka/admin/CreateTopicCommand.scala | 1 +
core/src/main/scala/kafka/api/FetchRequest.scala | 4 +-
core/src/main/scala/kafka/api/FetchResponse.scala | 19 +-
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 10 +-
.../scala/kafka/api/LeaderAndIsrResponse.scala | 10 +-
core/src/main/scala/kafka/api/OffsetRequest.scala | 8 +-
core/src/main/scala/kafka/api/OffsetResponse.scala | 10 +-
.../src/main/scala/kafka/api/ProducerRequest.scala | 8 +-
.../main/scala/kafka/api/ProducerResponse.scala | 9 +-
.../main/scala/kafka/api/StopReplicaRequest.scala | 10 +-
.../main/scala/kafka/api/StopReplicaResponse.scala | 10 +-
core/src/main/scala/kafka/api/TopicMetadata.scala | 120 +++-------
.../scala/kafka/api/TopicMetadataRequest.scala | 13 +-
.../scala/kafka/api/TopicMetadataResponse.scala | 35 ++-
core/src/main/scala/kafka/client/ClientUtils.scala | 32 ++-
core/src/main/scala/kafka/cluster/Broker.scala | 20 +-
.../scala/kafka/common/ClientIdAndBroker.scala | 26 ++
.../main/scala/kafka/common/ClientIdAndTopic.scala | 27 ++
core/src/main/scala/kafka/common/Config.scala | 40 +++
core/src/main/scala/kafka/common/Topic.scala | 40 +++
.../scala/kafka/common/TopicExistsException.scala | 22 ++
.../scala/kafka/consumer/ConsoleConsumer.scala | 27 ++-
.../main/scala/kafka/consumer/ConsumerConfig.scala | 33 +++-
.../kafka/consumer/ConsumerFetcherManager.scala | 2 +-
.../scala/kafka/consumer/ConsumerIterator.scala | 8 +-
.../scala/kafka/consumer/ConsumerTopicStat.scala | 40 ---
.../scala/kafka/consumer/ConsumerTopicStats.scala | 57 +++++
.../consumer/FetchRequestAndResponseStats.scala | 58 +++++
.../main/scala/kafka/consumer/KafkaStream.scala | 5 +-
.../scala/kafka/consumer/PartitionTopicInfo.scala | 9 +-
.../main/scala/kafka/consumer/SimpleConsumer.scala | 44 ++--
.../consumer/ZookeeperConsumerConnector.scala | 23 +-
.../scala/kafka/controller/KafkaController.scala | 2 +-
.../kafka/controller/PartitionLeaderSelector.scala | 6 +-
.../kafka/controller/PartitionStateMachine.scala | 4 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../scala/kafka/javaapi/TopicMetadataRequest.scala | 7 +-
.../kafka/javaapi/consumer/SimpleConsumer.scala | 6 +-
.../consumer/ZookeeperConsumerConnector.scala | 1 -
core/src/main/scala/kafka/log/Log.scala | 13 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
core/src/main/scala/kafka/message/Message.scala | 66 +++---
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 1 -
.../scala/kafka/producer/BrokerPartitionInfo.scala | 8 +-
.../scala/kafka/producer/ConsoleProducer.scala | 67 ++++-
.../scala/kafka/producer/DefaultPartitioner.scala | 1 -
core/src/main/scala/kafka/producer/Producer.scala | 50 +---
.../main/scala/kafka/producer/ProducerConfig.scala | 31 +++-
.../main/scala/kafka/producer/ProducerPool.scala | 18 +-
.../kafka/producer/ProducerRequestStats.scala | 56 +++++
.../main/scala/kafka/producer/ProducerStats.scala | 40 +++
.../scala/kafka/producer/ProducerTopicStats.scala | 57 +++++
.../main/scala/kafka/producer/SyncProducer.scala | 24 +-
.../scala/kafka/producer/SyncProducerConfig.scala | 8 +-
.../kafka/producer/async/AsyncProducerStats.scala | 25 --
.../kafka/producer/async/DefaultEventHandler.scala | 65 +++---
.../kafka/producer/async/ProducerSendThread.scala | 11 +-
core/src/main/scala/kafka/serializer/Decoder.scala | 8 -
core/src/main/scala/kafka/serializer/Encoder.scala | 2 -
.../scala/kafka/server/AbstractFetcherThread.scala | 52 +++--
.../kafka/server/HighwaterMarkCheckpoint.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 81 ++++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 8 +-
.../scala/kafka/server/KafkaRequestHandler.scala | 8 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 8 +-
.../main/scala/kafka/server/KafkaZooKeeper.scala | 3 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../main/scala/kafka/server/RequestPurgatory.scala | 76 +++---
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 2 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 16 +-
.../main/scala/kafka/tools/GetOffsetShell.scala | 2 +-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 1 -
.../src/main/scala/kafka/tools/ProducerShell.scala | 71 ------
.../main/scala/kafka/tools/ReplayLogProducer.scala | 2 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 10 +-
.../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 2 +-
core/src/main/scala/kafka/utils/Topic.scala | 41 ---
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +-
.../test/scala/other/kafka/TestKafkaAppender.scala | 1 -
.../scala/other/kafka/TestZKConsumerOffsets.scala | 1 -
.../test/scala/unit/kafka/admin/AdminTest.scala | 55 ++--
.../api/RequestResponseSerializationTest.scala | 21 +-
.../test/scala/unit/kafka/common/ConfigTest.scala | 89 +++++++
.../test/scala/unit/kafka/common/TopicTest.scala | 61 +++++
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 10 +-
.../kafka/integration/AutoOffsetResetTest.scala | 1 -
.../scala/unit/kafka/integration/FetcherTest.scala | 10 +-
.../kafka/integration/LazyInitProducerTest.scala | 2 +-
.../unit/kafka/integration/PrimitiveApiTest.scala | 1 -
.../integration/ProducerConsumerTestHarness.scala | 6 +-
.../unit/kafka/integration/TopicMetadataTest.scala | 2 +-
.../consumer/ZookeeperConsumerConnectorTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 27 ++
.../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 3 +-
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 58 +++---
.../scala/unit/kafka/producer/ProducerTest.scala | 5 +-
.../unit/kafka/producer/SyncProducerTest.scala | 8 +-
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 10 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 -
.../unit/kafka/server/ServerShutdownTest.scala | 7 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 37 ++--
.../test/scala/unit/kafka/utils/TopicTest.scala | 61 -----
.../java/kafka/examples/SimpleConsumerDemo.java | 3 +-
.../scala/kafka/perf/ProducerPerformance.scala | 19 +-
.../kafka/perf/SimpleConsumerPerformance.scala | 2 +-
project/build/KafkaProject.scala | 2 +-
.../migration_tool_test.py | 14 +-
.../mirror_maker_testsuite/mirror_maker_test.py | 17 +-
.../replication_testsuite/replica_basic_test.py | 35 ++--
.../testcase_9051/cluster_config.json | 63 +-----
.../testcase_9051/testcase_9051_properties.json | 56 +----
system_test/testcase_to_run.json | 2 +-
system_test/utils/kafka_system_test_utils.py | 191 +++++++++++++--
system_test/utils/testcase_env.py | 5 +
123 files changed, 1634 insertions(+), 1078 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index d107d71,66c07af..1654dbf
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -93,26 -143,25 +93,35 @@@ class Log(val dir: File
val ls = dir.listFiles()
if(ls != null) {
for(file <- ls if file.isFile) {
- val filename = file.getName()
- if(!file.canRead) {
++ if(!file.canRead)
+ throw new IOException("Could not read file " + file)
- } else if(filename.endsWith(IndexFileSuffix)) {
- // ensure that we have a corresponding log file for this index file
- val log = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
- if(!log.exists) {
+ val filename = file.getName
+ if(filename.endsWith(DeletedFileSuffix)) {
++ // if the file ends in .deleted, delete it
+ val deleted = file.delete()
+ if(!deleted)
+ warn("Attempt to delete defunct segment file %s failed.".format(filename))
++ } else if(filename.endsWith(IndexFileSuffix)) {
++ // if it is an index file, make sure it has a corresponding .log file
++ val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
++ if(!logFile.exists) {
+ warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+ file.delete()
+ }
} else if(filename.endsWith(LogFileSuffix)) {
- if(!file.canRead)
- throw new IOException("Could not read file " + file)
- val offset = filename.substring(0, filename.length - LogFileSuffix.length).toLong
- // TODO: we should ideally rebuild any missing index files, instead of erroring out
- if(!Log.indexFilename(dir, offset).exists)
- throw new IllegalStateException("Found log file with no corresponding index file.")
- logSegments.add(new LogSegment(dir = dir,
- startOffset = offset,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize))
++ // if its a log file, load the corresponding log segment
+ val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+ val hasIndex = Log.indexFilename(dir, start).exists
+ val segment = new LogSegment(dir = dir,
+ startOffset = start,
+ indexIntervalBytes = indexIntervalBytes,
+ maxIndexSize = maxIndexSize)
+ if(!hasIndex) {
+ // this can only happen if someone manually deletes the index file
+ error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+ segment.recover(maxMessageSize)
+ }
+ logSegments.put(start, segment)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/producer/ProducerConfig.scala
index 72f68dc,e559187..6c85577
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@@ -81,8 -111,7 +108,10 @@@ class ProducerConfig private (val props
*/
val producerRetries = props.getInt("producer.num.retries", 3)
+ /**
+ * The amount of time to wait in between retries
+ */
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+
+ validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index eff627c,5a85b04..ef3b66e
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -244,12 -245,7 +246,12 @@@ class KafkaApis(val requestChannel: Req
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
val log = localReplica.log.get
- val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+ val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+
+ // update stats
- BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count)
- BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count)
++ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count)
++ BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(info.count)
+
// we may need to increment high watermark since ISR could be down to 1
localReplica.partition.maybeIncrementLeaderHW(localReplica)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
@@@ -418,62 -410,9 +419,62 @@@
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
}
})
- val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
+ val response = OffsetResponse(offsetRequest.correlationId, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
+
+ def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+ logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
+ case Some(log) =>
+ fetchOffsetsBefore(log, timestamp, maxNumOffsets)
+ case None =>
+ if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
+ Seq(0L)
+ else
+ Nil
+ }
+ }
+
+ def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+ val segsArray = log.logSegments.toArray
+ var offsetTimeArray: Array[(Long, Long)] = null
+ if(segsArray.last.size > 0)
+ offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
+ else
+ offsetTimeArray = new Array[(Long, Long)](segsArray.length)
+
+ for(i <- 0 until segsArray.length)
+ offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
+ if(segsArray.last.size > 0)
+ offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
+
+ var startIndex = -1
+ timestamp match {
+ case OffsetRequest.LatestTime =>
+ startIndex = offsetTimeArray.length - 1
+ case OffsetRequest.EarliestTime =>
+ startIndex = 0
+ case _ =>
+ var isFound = false
+ debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+ startIndex = offsetTimeArray.length - 1
+ while (startIndex >= 0 && !isFound) {
+ if (offsetTimeArray(startIndex)._2 <= timestamp)
+ isFound = true
+ else
+ startIndex -=1
+ }
+ }
+
+ val retSize = maxNumOffsets.min(startIndex + 1)
+ val ret = new Array[Long](retSize)
+ for(j <- 0 until retSize) {
+ ret(j) = offsetTimeArray(startIndex)._1
+ startIndex -= 1
+ }
+ // ensure that the returned seq is in descending order of offsets
+ ret.toSeq.sortBy(- _)
+ }
/**
* Service the topic metadata request API
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7e17da4,7a31b51..3b61d2a
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@@ -374,33 -374,38 +374,34 @@@ class AdminTest extends JUnit3Suite wit
var controllerId = ZkUtils.getController(zkClient)
var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
var partitionsRemaining = controller.shutdownBroker(2)
- assertEquals(0, partitionsRemaining)
- var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
- var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
- assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
- assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-
- leaderBeforeShutdown = leaderAfterShutdown
- controllerId = ZkUtils.getController(zkClient)
- controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
- partitionsRemaining = controller.shutdownBroker(1)
- assertEquals(0, partitionsRemaining)
- topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
- leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
- assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
- assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+ try {
+ assertEquals(0, partitionsRemaining)
+ var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+ var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+ assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
- // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
- assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
- leaderBeforeShutdown = leaderAfterShutdown
- controllerId = ZkUtils.getController(zkClient)
- controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
- partitionsRemaining = controller.shutdownBroker(0)
- assertEquals(1, partitionsRemaining)
- topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
- leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
- assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
- assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+ leaderBeforeShutdown = leaderAfterShutdown
+ controllerId = ZkUtils.getController(zkClient)
+ controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+ partitionsRemaining = controller.shutdownBroker(1)
+ assertEquals(0, partitionsRemaining)
+ topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+ leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+ assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
- // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+ assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
- servers.foreach(_.shutdown())
+ leaderBeforeShutdown = leaderAfterShutdown
+ controllerId = ZkUtils.getController(zkClient)
+ controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+ partitionsRemaining = controller.shutdownBroker(0)
+ assertEquals(1, partitionsRemaining)
+ topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+ leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+ assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+ assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
- }
- finally {
++ } finally {
+ servers.foreach(_.shutdown())
+ }
}
private def checkIfReassignPartitionPathExists(): Boolean = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala
index fbd738a,900d0e2..109474c
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@@ -438,10 -365,59 +438,37 @@@ class LogTest extends JUnitSuite
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
}
-
-
- @Test
- def testAppendWithoutOffsetAssignment() {
- for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
- logDir.mkdir()
- var log = new Log(logDir,
- maxLogFileSize = 64*1024,
- maxMessageSize = config.maxMessageSize,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = true)
- val messages = List("one", "two", "three", "four", "five", "six")
- val ms = new ByteBufferMessageSet(compressionCodec = codec,
- offsetCounter = new AtomicLong(5),
- messages = messages.map(s => new Message(s.getBytes)):_*)
- val firstOffset = ms.shallowIterator.toList.head.offset
- val lastOffset = ms.shallowIterator.toList.last.offset
- val (first, last) = log.append(ms, assignOffsets = false)
- assertEquals(last + 1, log.logEndOffset)
- assertEquals(firstOffset, first)
- assertEquals(lastOffset, last)
- assertTrue(log.read(5, 64*1024).size > 0)
- log.delete()
- }
- }
+
+ /**
+ * When we open a log any index segments without an associated log segment should be deleted.
+ */
+ @Test
+ def testBogusIndexSegmentsAreRemoved() {
+ val bogusIndex1 = Log.indexFilename(logDir, 0)
+ val bogusIndex2 = Log.indexFilename(logDir, 5)
+
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
++ time.scheduler,
++ maxSegmentSize = set.sizeInBytes * 5,
+ maxMessageSize = config.maxMessageSize,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 1,
+ needsRecovery = false)
+
+ assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
+ assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
+
+ // check that we can append to the log
+ for(i <- 0 until 10)
+ log.append(set)
+
+ log.delete()
+ }
+ /**
+ * Verify that truncation works correctly after re-opening the log
+ */
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index aa58dce,0000000..f69b379
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@@ -1,219 -1,0 +1,219 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import kafka.utils._
+import junit.framework.Assert._
+import java.util.{Random, Properties}
+import kafka.consumer.SimpleConsumer
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+import kafka.utils.TestUtils._
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils.nonthreadsafe
+import kafka.utils.threadsafe
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val random = new Random()
+ var logDir: File = null
+ var topicLogDir: File = null
+ var server: KafkaServer = null
+ var logSize: Int = 100
+ val brokerPort: Int = 9099
+ var simpleConsumer: SimpleConsumer = null
+ var time: Time = new MockTime()
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ val config: Properties = createBrokerConfig(1, brokerPort)
+ val logDirPath = config.getProperty("log.dir")
+ logDir = new File(logDirPath)
+ time = new MockTime()
+ server = TestUtils.createServer(new KafkaConfig(config), time)
- simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
++ simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
+ }
+
+ @After
+ override def tearDown() {
+ simpleConsumer.close
+ server.shutdown
+ Utils.rm(logDir)
+ super.tearDown()
+ }
+
+ @Test
+ def testGetOffsetsForUnknownTopic() {
+ val topicAndPartition = TopicAndPartition("foo", 0)
+ val request = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
+ val offsetResponse = simpleConsumer.getOffsetsBefore(request)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+ offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
+ }
+
+ @Test
+ def testGetOffsetsBeforeLatestTime() {
+ val topicPartition = "kafka-" + 0
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
++ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+ replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
++ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+
+ // try to fetch using latest offset
+ val fetchResponse = simpleConsumer.fetch(
+ new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
+ assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
+ }
+
+ @Test
+ def testEmptyLogsGetOffsets() {
+ val topicPartition = "kafka-" + random.nextInt(10)
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ topicLogDir = new File(topicPartitionPath)
+ topicLogDir.mkdir
+
+ val topic = topicPartition.split("-").head
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+
+ var offsetChanged = false
+ for(i <- 1 to 14) {
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+
+ if(consumerOffsets(0) == 1) {
+ offsetChanged = true
+ }
+ }
+ assertFalse(offsetChanged)
+ }
+
+ @Test
+ def testGetOffsetsBeforeNow() {
+ val topicPartition = "kafka-" + random.nextInt(3)
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
++ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
++ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+ }
+
+ @Test
+ def testGetOffsetsBeforeEarliestTime() {
+ val topicPartition = "kafka-" + random.nextInt(3)
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10)
+
+ assertEquals(Seq(0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(0L), consumerOffsets)
+ }
+
+ private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ val props = new Properties
+ props.put("brokerid", nodeId.toString)
+ props.put("port", port.toString)
+ props.put("log.dir", getLogDir.getAbsolutePath)
+ props.put("log.flush.interval", "1")
+ props.put("enable.zookeeper", "false")
+ props.put("num.partitions", "20")
+ props.put("log.retention.hours", "10")
+ props.put("log.cleanup.interval.mins", "5")
+ props.put("log.file.size", logSize.toString)
+ props.put("zk.connect", zkConnect.toString)
+ props
+ }
+
+ private def getLogDir(): File = {
+ val dir = TestUtils.tempDir()
+ dir
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------