You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/19 16:56:49 UTC
[2/5] kafka git commit: KAFKA-3025;
Added timetamp to Message and use relative offset.
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2c6311c..ae810eb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -94,9 +94,12 @@ object Defaults {
val LogFlushSchedulerIntervalMs = Long.MaxValue
val LogFlushOffsetCheckpointIntervalMs = 60000
val LogPreAllocateEnable = false
+ val MessageFormatVersion = ApiVersion.latestVersion.toString()
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
+ val MessageTimestampType = "CreateTime"
+ val MessageTimestampDifferenceMaxMs = Long.MaxValue
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -251,9 +254,12 @@ object KafkaConfig {
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
val LogPreAllocateProp = "log.preallocate"
+ val MessageFormatVersionProp = "message.format.version"
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
+ val MessageTimestampTypeProp = "message.timestamp.type"
+ val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@@ -417,6 +423,14 @@ object KafkaConfig {
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with acks=all (or -1)"
+ val MessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion." +
+ "Some Examples are: 0.8.2, 0.9.0.0, 0.10.0. Check ApiVersion for detail. When setting the message format version, " +
+ "user certifies that all the existing messages on disk is at or below that version. Otherwise consumers before 0.10.0.0 will break."
+ val MessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either" +
+ " \"CreateTime\" or \"LogAppendTime\""
+ val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
+ "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " +
+ "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@@ -556,7 +570,7 @@ object KafkaConfig {
.define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
.define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
.define(LogDirsProp, STRING, null, HIGH, LogDirsDoc)
- .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinHeaderSize), HIGH, LogSegmentBytesDoc)
+ .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinMessageOverhead), HIGH, LogSegmentBytesDoc)
.define(LogRollTimeMillisProp, LONG, null, HIGH, LogRollTimeMillisDoc)
.define(LogRollTimeHoursProp, INT, Defaults.LogRollHours, atLeast(1), HIGH, LogRollTimeHoursDoc)
@@ -591,6 +605,9 @@ object KafkaConfig {
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
+ .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc)
+ .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc)
+ .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -781,6 +798,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val logRetentionTimeMillis = getLogRetentionTimeMillis
val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
+ val messageFormatVersion = getString(KafkaConfig.MessageFormatVersionProp)
+ val messageTimestampType = getString(KafkaConfig.MessageTimestampTypeProp)
+ val messageTimestampDifferenceMaxMs = getLong(KafkaConfig.MessageTimestampDifferenceMaxMsProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -958,6 +978,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " +
s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}"
)
+ require(interBrokerProtocolVersion.onOrAfter(ApiVersion(messageFormatVersion)),
+ s"message.format.version $messageFormatVersion cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersion")
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 41719e2..e3e185f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,7 +21,7 @@ import java.net.{SocketTimeoutException}
import java.util
import kafka.admin._
-import kafka.api.KAFKA_090
+import kafka.api.KAFKA_0_9_0
import kafka.log.LogConfig
import kafka.log.CleanerConfig
import kafka.log.LogManager
@@ -75,6 +75,9 @@ object KafkaServer {
logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
+ logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion)
+ logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType)
+ logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.messageTimestampDifferenceMaxMs)
logProps
}
}
@@ -197,7 +200,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
kafkaController.startup()
/* start kafka coordinator */
- consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
+ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaMetricsTime)
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
@@ -216,7 +219,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
Mx4jLoader.maybeLoad()
/* start dynamic config manager */
- dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
+ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
@@ -512,7 +515,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val shutdownSucceeded =
// Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
// `RequestHeader`, which is used by `NetworkClient`
- if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_090))
+ if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0))
networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c5f3360..2fdb46c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -23,7 +23,7 @@ import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.log.LogConfig
import kafka.message.ByteBufferMessageSet
-import kafka.api.KAFKA_090
+import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0}
import kafka.common.{KafkaStorageException, TopicAndPartition}
import ReplicaFetcherThread._
import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
@@ -55,7 +55,10 @@ class ReplicaFetcherThread(name: String,
type REQ = FetchRequest
type PD = PartitionData
- private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) 1 else 0
+ private val fetchRequestVersion: Short =
+ if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_10_0_IV0)) 2
+ else if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) 1
+ else 0
private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
private val replicaId = brokerConfig.brokerId
private val maxWait = brokerConfig.replicaFetchWaitMaxMs
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 61b6887..16b8c3a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -26,11 +26,12 @@ import kafka.cluster.{Partition, Replica}
import kafka.common._
import kafka.controller.KafkaController
import kafka.log.{LogAppendInfo, LogManager}
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
-InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException}
+InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException,
+InvalidTimestampException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
@@ -332,7 +333,7 @@ class ReplicaManager(val config: KafkaConfig,
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
- new PartitionResponse(result.errorCode, result.info.firstOffset)) // response status
+ new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
}
if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
@@ -358,9 +359,9 @@ class ReplicaManager(val config: KafkaConfig,
// Just return an error and don't handle the request at all
val responseStatus = messagesPerPartition.map {
case (topicAndPartition, messageSet) =>
- (topicAndPartition ->
- new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
- LogAppendInfo.UnknownLogAppendInfo.firstOffset))
+ (topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+ LogAppendInfo.UnknownLogAppendInfo.firstOffset,
+ Message.NoTimestamp))
}
responseCallback(responseStatus)
}
@@ -440,6 +441,10 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
case imse: CorruptRecordException =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
+ case ime : InvalidMessageException =>
+ (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(ime)))
+ case itse : InvalidTimestampException =>
+ (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(itse)))
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -568,6 +573,10 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] = {
+ getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap(_.log.map(_.config.messageFormatVersion))
+ }
+
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 73743aa..fe2ce9f 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter
import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
@@ -124,7 +125,7 @@ object ConsoleConsumer extends Logging {
}
messageCount += 1
try {
- formatter.writeTo(msg.key, msg.value, System.out)
+ formatter.writeTo(msg.key, msg.value, msg.timestamp, msg.timestampType, System.out)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
@@ -335,7 +336,7 @@ object ConsoleConsumer extends Logging {
}
trait MessageFormatter{
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
+ def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream)
def init(props: Properties) {}
@@ -356,12 +357,16 @@ class DefaultMessageFormatter extends MessageFormatter {
lineSeparator = props.getProperty("line.separator").getBytes
}
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+ def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+ if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
+ output.write(s"$timestampType:$timestamp".getBytes)
+ output.write(keySeparator)
+ }
if (printKey) {
- output.write(if (key == null) "null".getBytes() else key)
+ output.write(if (key == null) "null".getBytes else key)
output.write(keySeparator)
}
- output.write(if (value == null) "null".getBytes() else value)
+ output.write(if (value == null) "null".getBytes else value)
output.write(lineSeparator)
}
}
@@ -370,17 +375,19 @@ class LoggingMessageFormatter extends MessageFormatter {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
val logger = Logger.getLogger(getClass().getName)
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream): Unit = {
- defaultWriter.writeTo(key, value, output)
+ def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream): Unit = {
+ defaultWriter.writeTo(key, value, timestamp, timestampType, output)
if(logger.isInfoEnabled)
- logger.info(s"key:${if (key == null) "null" else new String(key)}, value:${if (value == null) "null" else new String(value)}")
+ logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
+ s"key:${if (key == null) "null" else new String(key)}, " +
+ s"value:${if (value == null) "null" else new String(value)}")
}
}
class NoOpMessageFormatter extends MessageFormatter {
override def init(props: Properties) {}
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+ def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream){}
}
class ChecksumMessageFormatter extends MessageFormatter {
@@ -394,8 +401,12 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
- val chksum = new Message(value, key).checksum
+ def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+ val chksum =
+ if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
+ new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum
+ else
+ new Message(value, key, Message.NoTimestamp, Message.MagicValue_V0).checksum
output.println(topicStr + "checksum:" + chksum)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fd15014..3c41c7c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -274,7 +274,7 @@ object DumpLogSegments {
case NoCompressionCodec =>
getSingleMessageIterator(messageAndOffset)
case _ =>
- ByteBufferMessageSet.deepIterator(message)
+ ByteBufferMessageSet.deepIterator(messageAndOffset)
}
} else
getSingleMessageIterator(messageAndOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index a964f69..95b0aad 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -485,7 +485,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def receive() : BaseConsumerRecord = {
val messageAndMetadata = iter.next()
- BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
+ BaseConsumerRecord(messageAndMetadata.topic,
+ messageAndMetadata.partition,
+ messageAndMetadata.offset,
+ messageAndMetadata.timestamp,
+ messageAndMetadata.timestampType,
+ messageAndMetadata.key,
+ messageAndMetadata.message)
}
override def stop() {
@@ -541,7 +547,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
offsets.put(tp, record.offset + 1)
- BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+ BaseConsumerRecord(record.topic,
+ record.partition,
+ record.offset,
+ record.timestamp,
+ record.timestampType,
+ record.key,
+ record.value)
}
override def stop() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 2b8537b..d88ec41 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -139,8 +139,8 @@ object ReplayLogProducer extends Logging {
stream
for (messageAndMetadata <- iter) {
try {
- val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
- messageAndMetadata.key(), messageAndMetadata.message()))
+ val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, null,
+ messageAndMetadata.timestamp, messageAndMetadata.key(), messageAndMetadata.message()))
if(config.isSync) {
response.get()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 1c2023c..e20b061 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -220,7 +220,8 @@ object SimpleConsumerShell extends Logging {
System.out.println("next offset = " + offset)
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
- formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
+ val value = if (message.isNull()) null else Utils.readBytes(message.payload)
+ formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out)
numMessagesConsumed += 1
} catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index bc3a6ce..f15c005 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -14,8 +14,10 @@ package kafka.api
import java.util
+import kafka.coordinator.GroupCoordinator
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
@@ -24,11 +26,10 @@ import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
-import org.junit.{Test, Before}
+import org.junit.{Before, Test}
-import scala.collection.mutable.Buffer
import scala.collection.JavaConverters._
-import kafka.coordinator.GroupCoordinator
+import scala.collection.mutable.Buffer
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
@@ -75,7 +76,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(1, this.consumers(0).assignment.size)
this.consumers(0).seek(tp, 0)
- consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0)
// check async commit callbacks
val commitCallback = new CountConsumerCommitCallback()
@@ -245,7 +246,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
sendRecords(5)
consumer0.subscribe(List(topic).asJava)
- consumeAndVerifyRecords(consumer0, 5, 0)
+ consumeAndVerifyRecords(consumer = consumer0, numRecords = 5, startingOffset = 0)
consumer0.pause(tp)
// subscribe to a new topic to trigger a rebalance
@@ -253,7 +254,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// after rebalance, our position should be reset and our pause state lost,
// so we should be able to consume from the beginning
- consumeAndVerifyRecords(consumer0, 0, 5)
+ consumeAndVerifyRecords(consumer = consumer0, numRecords = 0, startingOffset = 5)
}
protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
@@ -276,26 +277,33 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
protected def sendRecords(numRecords: Int, tp: TopicPartition) {
- sendRecords(this.producers(0), numRecords, tp)
- }
-
- protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]],
- numRecords: Int,
- tp: TopicPartition) {
(0 until numRecords).foreach { i =>
- producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
+ this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes))
}
- producer.flush()
+ this.producers(0).flush()
}
- protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
- startingKeyAndValueIndex: Int = 0, tp: TopicPartition = tp) {
+ protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+ numRecords: Int,
+ startingOffset: Int,
+ startingKeyAndValueIndex: Int = 0,
+ startingTimestamp: Long = 0L,
+ timestampType: TimestampType = TimestampType.CREATE_TIME,
+ tp: TopicPartition = tp) {
val records = consumeRecords(consumer, numRecords)
+ val now = System.currentTimeMillis()
for (i <- 0 until numRecords) {
val record = records.get(i)
val offset = startingOffset + i
assertEquals(tp.topic(), record.topic())
assertEquals(tp.partition(), record.partition())
+ if (timestampType == TimestampType.CREATE_TIME) {
+ assertEquals(timestampType, record.timestampType())
+ val timestamp = startingTimestamp + i
+ assertEquals(timestamp.toLong, record.timestamp())
+ } else
+ assertTrue(s"Got unexpected timestamp ${record.timestamp()}. Timestamp should be between [$startingTimestamp, $now}]",
+ record.timestamp() >= startingTimestamp && record.timestamp() <= now)
assertEquals(offset.toLong, record.offset())
val keyAndValueIndex = startingKeyAndValueIndex + i
assertEquals(s"key $keyAndValueIndex", new String(record.key()))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 42928a3..807b8bb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -17,19 +17,21 @@
package kafka.api
-import java.io.File
import java.util.Properties
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
import kafka.message.Message
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
+import org.apache.kafka.common.record.TimestampType
import org.junit.Assert._
import org.junit.{After, Before, Test}
+
import scala.collection.mutable.Buffer
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@@ -54,8 +56,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
- consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
+ consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024 * 1024, "")
+ consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024, "")
}
@After
@@ -88,6 +90,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
object callback extends Callback {
var offset = 0L
+
def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception == null) {
assertEquals(offset, metadata.offset())
@@ -105,24 +108,24 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
// send a normal record
- val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
+ val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
- val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null)
+ val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, null)
assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
- val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes)
+ val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
- val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+ val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
- val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
+ val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
@@ -143,6 +146,81 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
@Test
+ def testSendCompressedMessageWithCreateTime() {
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+ sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
+ }
+
+ @Test
+ def testSendNonCompressedMessageWithCreateTime() {
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+ sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
+ }
+
+ @Test
+ def testSendCompressedMessageWithLogAppendTime() {
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+ sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+ }
+
+ @Test
+ def testSendNonCompressedMessageWithLogApendTime() {
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+ sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+ }
+
+ private def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) {
+ val partition = new Integer(0)
+
+ val baseTimestamp = 123456L
+ val startTime = System.currentTimeMillis()
+
+ object callback extends Callback {
+ var offset = 0L
+ var timestampDiff = 1L
+
+ def onCompletion(metadata: RecordMetadata, exception: Exception) {
+ if (exception == null) {
+ assertEquals(offset, metadata.offset())
+ assertEquals(topic, metadata.topic())
+ if (timestampType == TimestampType.CREATE_TIME)
+ assertEquals(baseTimestamp + timestampDiff, metadata.timestamp())
+ else
+ assertTrue(metadata.timestamp() >= startTime && metadata.timestamp() <= System.currentTimeMillis())
+ assertEquals(partition, metadata.partition())
+ offset += 1
+ timestampDiff += 1
+ } else {
+ fail("Send callback returns the following exception", exception)
+ }
+ }
+ }
+
+ try {
+ // create topic
+ val topicProps = new Properties()
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
+ else
+ topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
+ TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+ for (i <- 1 to numRecords) {
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
+ producer.send(record, callback)
+ }
+ producer.close(5000L, TimeUnit.MILLISECONDS)
+ assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}", numRecords, callback.offset)
+ } finally {
+ producer.close()
+ }
+ }
+
+ @Test
def testWrongSerializer() {
// send a record with a wrong type should receive a serialization exception
try {
@@ -155,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
- private def createProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+ private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
@@ -176,7 +254,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
// non-blocking send a list of records
- val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+ val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
for (i <- 1 to numRecords)
producer.send(record0)
val response0 = producer.send(record0)
@@ -212,9 +290,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val leader1 = leaders(partition)
assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
+ val now = System.currentTimeMillis()
val responses =
for (i <- 1 to numRecords)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
+ yield producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, now, null, ("value" + i).getBytes))
val futures = responses.toList
futures.foreach(_.get)
for (future <- futures)
@@ -228,7 +307,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
// make sure the fetched messages also respect the partitioning and ordering
- val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
+ val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -238,7 +317,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// TODO: also check topic and partition after they are added in the return messageSet
for (i <- 0 to numRecords - 1) {
- assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
+ assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1), messageSet1(i).message)
assertEquals(i.toLong, messageSet1(i).offset)
}
} finally {
@@ -257,7 +336,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// Send a message to auto-create the topic
- val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected
@@ -277,7 +356,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
- for(i <- 0 until 50) {
+ for (i <- 0 until 50) {
val responses = (0 until numRecords) map (i => producer.send(record))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.flush()
@@ -302,7 +381,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
// Test closing from caller thread.
- for(i <- 0 until 50) {
+ for (i <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
val responses = (0 until numRecords) map (i => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
@@ -349,23 +428,58 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
}
}
- for(i <- 0 until 50) {
+ for (i <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
- // send message to partition 0
- val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
- assertTrue("No request is complete.", responses.forall(!_.isDone()))
- // flush the messages.
- producer.flush()
- assertTrue("All request are complete.", responses.forall(_.isDone()))
- // Check the messages received by broker.
- val fetchResponse = if (leader.get == configs(0).brokerId) {
- consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
- } else {
- consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ try {
+ // send message to partition 0
+ val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+ assertTrue("No request is complete.", responses.forall(!_.isDone()))
+ // flush the messages.
+ producer.flush()
+ assertTrue("All request are complete.", responses.forall(_.isDone()))
+ // Check the messages received by broker.
+ val fetchResponse = if (leader.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ }
+ val expectedNumRecords = (i + 1) * numRecords
+ assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+ expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+ } finally {
+ producer.close()
}
- val expectedNumRecords = (i + 1) * numRecords
- assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
- expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
}
}
+
+ @Test
+ def testSendWithInvalidCreateTime() {
+ val topicProps = new Properties()
+ topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000");
+ TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+ val producer = createProducer(brokerList = brokerList)
+ try {
+ producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+ fail("Should throw CorruptedRecordException")
+ } catch {
+ case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ } finally {
+ producer.close()
+ }
+
+ // Test compressed messages.
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+ val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
+ try {
+ compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+ fail("Should throw CorruptedRecordException")
+ } catch {
+ case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ } finally {
+ compressedProducer.close()
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b2f96e5..3d7cad3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -18,20 +18,23 @@ import java.util.Properties
import java.util.regex.Pattern
+import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer, ByteArraySerializer}
+import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer, ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
-import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
+import org.apache.kafka.common.record.{CompressionType, TimestampType}
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.junit.Assert._
import org.junit.Test
+
+import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
-import scala.collection.JavaConverters
-import JavaConverters._
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest {
@@ -96,14 +99,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testAutoOffsetReset() {
sendRecords(1)
this.consumers(0).assign(List(tp).asJava)
- consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
}
@Test
def testGroupConsumption() {
sendRecords(10)
this.consumers(0).subscribe(List(topic).asJava)
- consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
}
@Test
@@ -263,7 +266,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertFalse(partitions.isEmpty)
}
- @Test(expected=classOf[InvalidTopicException])
+ @Test(expected = classOf[InvalidTopicException])
def testPartitionsForInvalidTopic() {
this.consumers(0).partitionsFor(";3# ads,{234")
}
@@ -288,7 +291,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.seek(tp, mid)
assertEquals(mid, consumer.position(tp))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
+
+ consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
+ startingTimestamp = mid.toLong)
// Test seek compressed message
sendCompressedMessages(totalRecords.toInt, tp2)
@@ -305,7 +310,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.seek(tp2, mid)
assertEquals(mid, consumer.position(tp2))
consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
- tp = tp2)
+ startingTimestamp = mid.toLong, tp = tp2)
}
private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
@@ -314,17 +319,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
- sendRecords(producer, numRecords, tp)
+ (0 until numRecords).foreach { i =>
+ producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes))
+ }
producer.close()
}
+ @Test
def testPositionAndCommit() {
sendRecords(5)
- // committed() on a partition with no committed offset throws an exception
- intercept[NoOffsetForPartitionException] {
- this.consumers(0).committed(new TopicPartition(topic, 15))
- }
+ assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalArgumentException] {
@@ -337,7 +342,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumers(0).commitSync()
assertEquals(0L, this.consumers(0).committed(tp).offset)
- consumeAndVerifyRecords(this.consumers(0), 5, 0)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
this.consumers(0).commitSync()
assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
@@ -346,19 +351,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// another consumer in the same group should get the same position
this.consumers(1).assign(List(tp).asJava)
- consumeAndVerifyRecords(this.consumers(1), 1, 5)
+ consumeAndVerifyRecords(consumer = this.consumers(1), numRecords = 1, startingOffset = 5)
}
@Test
def testPartitionPauseAndResume() {
sendRecords(5)
this.consumers(0).assign(List(tp).asJava)
- consumeAndVerifyRecords(this.consumers(0), 5, 0)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
this.consumers(0).pause(tp)
sendRecords(5)
assertTrue(this.consumers(0).poll(0).isEmpty)
this.consumers(0).resume(tp)
- consumeAndVerifyRecords(this.consumers(0), 5, 5)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 5)
}
@Test
@@ -397,7 +402,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
// produce a record that is larger than the configured fetch size
- val record = new ProducerRecord[Array[Byte],Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
this.producers(0).send(record)
// consuming a too-large record should fail
@@ -534,7 +539,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
producerProps.put("mock.interceptor.append", appendStr)
- val testProducer = new KafkaProducer[String,String](producerProps, new StringSerializer, new StringSerializer)
+ val testProducer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer)
// produce records
val numRecords = 10
@@ -567,16 +572,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// commit sync and verify onCommit is called
val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
- testConsumer.commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
+ testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
assertEquals(2, testConsumer.committed(tp).offset)
- assertEquals(commitCountBefore+1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+ assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
// commit async and verify onCommit is called
val commitCallback = new CountConsumerCommitCallback()
- testConsumer.commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback)
+ testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback)
awaitCommitCallback(testConsumer, commitCallback)
assertEquals(5, testConsumer.committed(tp).offset)
- assertEquals(commitCountBefore+2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+ assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
testConsumer.close()
testProducer.close()
@@ -593,7 +598,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// produce records
val numRecords = 100
- val testProducer = new KafkaProducer[String,String](this.producerConfig, new StringSerializer, new StringSerializer)
+ val testProducer = new KafkaProducer[String, String](this.producerConfig, new StringSerializer, new StringSerializer)
(0 until numRecords).map { i =>
testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
}.foreach(_.get)
@@ -617,8 +622,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// change subscription to trigger rebalance
val commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
changeConsumerSubscriptionAndValidateAssignment(testConsumer,
- List(topic, topic2), Set(tp, tp2, new TopicPartition(topic2, 0),
- new TopicPartition(topic2, 1)),
+ List(topic, topic2),
+ Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)),
rebalanceListener)
// after rebalancing, we should have reset to the committed positions
@@ -644,14 +649,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
producerProps.put("mock.interceptor.append", appendStr)
- val testProducer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
+ val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
// producing records should succeed
testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
// create consumer with interceptor that has different key and value types from the consumer
this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
- val testConsumer = new KafkaConsumer[Array[Byte],Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
testConsumer.assign(List(tp).asJava)
testConsumer.seek(tp, 0)
@@ -664,6 +669,46 @@ class PlaintextConsumerTest extends BaseConsumerTest {
testProducer.close()
}
+ def testConsumeMessagesWithCreateTime() {
+ val numRecords = 50
+ // Test non-compressed messages
+ sendRecords(numRecords, tp)
+ this.consumers(0).assign(List(tp).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
+ startingTimestamp = 0)
+
+ // Test compressed messages
+ sendCompressedMessages(numRecords, tp2)
+ this.consumers(0).assign(List(tp2).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+ startingTimestamp = 0)
+ }
+
+ @Test
+ def testConsumeMessagesWithLogAppendTime() {
+ val topicName = "testConsumeMessagesWithLogAppendTime"
+ val topicProps = new Properties()
+ topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
+ TestUtils.createTopic(zkUtils, topicName, 2, 2, servers, topicProps)
+
+ val startTime = System.currentTimeMillis()
+ val numRecords = 50
+
+ // Test non-compressed messages
+ val tp1 = new TopicPartition(topicName, 0)
+ sendRecords(numRecords, tp1)
+ this.consumers(0).assign(List(tp1).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
+ startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
+
+ // Test compressed messages
+ val tp2 = new TopicPartition(topicName, 1)
+ sendCompressedMessages(numRecords, tp2)
+ this.consumers(0).assign(List(tp2).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+ startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
+ }
+
def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
// use consumers defined in this class plus one additional consumer
// Use topic defined in this class + one additional topic
@@ -693,7 +738,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3*maxSessionTimeout)
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout)
// done with pollers and consumers
for (poller <- consumerPollers)
@@ -810,7 +855,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// wait until topics get re-assigned and validate assignment
validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)")
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)")
}
/**
@@ -844,7 +889,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L)
validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
}
def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 0d401f7..c4a2bd7 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -87,8 +87,9 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
yield ("value" + i).getBytes
// make sure the returned messages are correct
+ val now = System.currentTimeMillis()
val responses = for (message <- messages)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message))
+ yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, now, null, message))
val futures = responses.toList
for ((future, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)
@@ -101,7 +102,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
var index = 0
for (message <- messages) {
- assertEquals(new Message(bytes = message), messageSet(index).message)
+ assertEquals(new Message(bytes = message, now, Message.MagicValue_V1), messageSet(index).message)
assertEquals(index.toLong, messageSet(index).offset)
index += 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index e4b8854..fafc4b0 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -287,4 +287,5 @@ class RequestResponseSerializationTest extends JUnitSuite {
// new response should have 4 bytes more than the old response since delayTime is an INT32
assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 7e6e765..587abd5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,12 +17,14 @@
package kafka.coordinator
+import org.apache.kafka.common.record.Record
import org.junit.Assert._
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.message.MessageSet
+import kafka.message.{Message, MessageSet}
import kafka.server.{ReplicaManager, KafkaConfig}
import kafka.utils._
+import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
@@ -87,7 +89,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
- groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager)
+ groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
groupCoordinator.startup()
// add the partition into the owned partition list
@@ -833,9 +835,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
- new PartitionResponse(Errors.NONE.code, 0L)
+ new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -909,9 +912,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
- new PartitionResponse(Errors.NONE.code, 0L)
+ new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -922,6 +926,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val (responseFuture, responseCallback) = setupHeartbeatCallback
EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index a8092de..69218ba 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -261,7 +261,8 @@ class CleanerTest extends JUnitSuite {
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
// forward offset and append message to next segment at offset Int.MaxValue
- val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1), new Message("hello".getBytes, "hello".getBytes))
+ val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1),
+ new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1))
log.append(messageSet, assignOffsets = false)
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
@@ -448,13 +449,19 @@ class CleanerTest extends JUnitSuite {
def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
def message(key: Int, value: Int) =
- new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+ new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+ bytes = value.toString.getBytes,
+ timestamp = Message.NoTimestamp,
+ magicValue = Message.MagicValue_V1))
def unkeyedMessage(value: Int) =
new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
def deleteMessage(key: Int) =
- new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
+ new ByteBufferMessageSet(new Message(key=key.toString.getBytes,
+ bytes=null,
+ timestamp = Message.NoTimestamp,
+ magicValue = Message.MagicValue_V1))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 95085f4..0179166 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -200,4 +200,72 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
assertEquals(oldposition, tempReopen.length)
}
+ @Test
+ def testMessageFormatConversion() {
+
+ // Prepare messages.
+ val offsets = Seq(0L, 2L)
+ val messagesV0 = Seq(new Message("hello".getBytes, "k1".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
+ new Message("goodbye".getBytes, "k2".getBytes, Message.NoTimestamp, Message.MagicValue_V0))
+ val messageSetV0 = new ByteBufferMessageSet(
+ compressionCodec = NoCompressionCodec,
+ offsetSeq = offsets,
+ messages = messagesV0:_*)
+ val compressedMessageSetV0 = new ByteBufferMessageSet(
+ compressionCodec = DefaultCompressionCodec,
+ offsetSeq = offsets,
+ messages = messagesV0:_*)
+
+ val messagesV1 = Seq(new Message("hello".getBytes, "k1".getBytes, 1L, Message.MagicValue_V1),
+ new Message("goodbye".getBytes, "k2".getBytes, 2L, Message.MagicValue_V1))
+ val messageSetV1 = new ByteBufferMessageSet(
+ compressionCodec = NoCompressionCodec,
+ offsetSeq = offsets,
+ messages = messagesV1:_*)
+ val compressedMessageSetV1 = new ByteBufferMessageSet(
+ compressionCodec = DefaultCompressionCodec,
+ offsetSeq = offsets,
+ messages = messagesV1:_*)
+
+ // Down conversion
+ // down conversion for non-compressed messages
+ var fileMessageSet = new FileMessageSet(tempFile())
+ fileMessageSet.append(messageSetV1)
+ fileMessageSet.flush()
+ var convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0)
+ verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
+
+ // down conversion for compressed messages
+ fileMessageSet = new FileMessageSet(tempFile())
+ fileMessageSet.append(compressedMessageSetV1)
+ fileMessageSet.flush()
+ convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0)
+ verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
+
+ // Up conversion. In reality we only do down conversion, but up conversion should work as well.
+ // up conversion for non-compressed messages
+ fileMessageSet = new FileMessageSet(tempFile())
+ fileMessageSet.append(messageSetV0)
+ fileMessageSet.flush()
+ convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
+ verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
+
+ // up conversion for compressed messages
+ fileMessageSet = new FileMessageSet(tempFile())
+ fileMessageSet.append(compressedMessageSetV0)
+ fileMessageSet.flush()
+ convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
+ verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
+
+ def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: Byte) {
+ var i = 0
+ for (messageAndOffset <- convertedMessageSet) {
+ assertEquals("magic byte should be 1", magicByte, messageAndOffset.message.magic)
+ assertEquals("offset should not change", offsets(i), messageAndOffset.offset)
+ assertEquals("key should not change", messagesV0(i).key, messageAndOffset.message.key)
+ assertEquals("payload should not change", messagesV0(i).payload, messageAndOffset.message.payload)
+ i += 1
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index de3d7a3..6b91611 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -99,7 +99,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
if (entry.message.compressionCodec == NoCompressionCodec)
Stream.cons(entry, Stream.empty).iterator
else
- ByteBufferMessageSet.deepIterator(entry.message)
+ ByteBufferMessageSet.deepIterator(entry)
}) yield {
val key = TestUtils.readString(messageAndOffset.message.key).toInt
val value = TestUtils.readString(messageAndOffset.message.payload).toInt
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 51cd62c..1be9e65 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -61,6 +61,7 @@ class LogConfigTest {
case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
+ case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
}
})
@@ -70,7 +71,7 @@ class LogConfigTest {
values.foreach((value) => {
val props = new Properties
props.setProperty(name, value.toString)
- intercept[ConfigException] {
+ intercept[Exception] {
LogConfig(props)
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 46bfbed..91a4449 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,6 +20,7 @@ package kafka.log
import java.io._
import java.util.Properties
+import kafka.api.ApiVersion
import kafka.common._
import kafka.server.OffsetCheckpoint
import kafka.utils._
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 47908e7..426b5e8 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -21,6 +21,7 @@ import java.io._
import java.util.Properties
import java.util.concurrent.atomic._
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+import kafka.api.ApiVersion
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
@@ -132,6 +133,8 @@ class LogTest extends JUnitSuite {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+ // We use need to use magic value 1 here because the test is message size sensitive.
+ logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
// create a log
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -160,6 +163,8 @@ class LogTest extends JUnitSuite {
def testAppendAndReadWithSequentialOffsets() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+ // We use need to use magic value 1 here because the test is message size sensitive.
+ logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
@@ -264,7 +269,8 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages) {
val messages = log.read(offset, 1024*1024).messageSet
assertEquals("Offsets not equal", offset, messages.head.offset)
- assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message)
+ assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message,
+ messages.head.message.toFormatVersion(messageSets(i).head.message.magic))
offset = messages.head.offset + 1
}
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
@@ -290,7 +296,7 @@ class LogTest extends JUnitSuite {
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
- def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message)
+ def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head)
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
@@ -343,6 +349,8 @@ class LogTest extends JUnitSuite {
val configSegmentSize = messageSet.sizeInBytes - 1
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
+ // We use need to use magic value 1 here because the test is message size sensitive.
+ logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
try {
@@ -355,8 +363,8 @@ class LogTest extends JUnitSuite {
@Test
def testCompactedTopicConstraints() {
- val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes)
- val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes)
+ val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
+ val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
@@ -404,7 +412,7 @@ class LogTest extends JUnitSuite {
@Test
def testMessageSizeCheck() {
val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
- val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
+ val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change (I need more bytes)".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1