You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/01 03:53:04 UTC
kafka git commit: KAFKA-3273;
MessageFormatter and MessageReader interfaces should be resilient to
changes
Repository: kafka
Updated Branches:
refs/heads/trunk 74eff8a83 -> 695fdc69d
KAFKA-3273; MessageFormatter and MessageReader interfaces should be resilient to changes
* Change `MessageFormat.writeTo` to take a `ConsumerRecord`
* Change `MessageReader.readMessage()` to use `ProducerRecord`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #972 from ijuma/kafka-3273-message-formatter-and-reader-resilient
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/695fdc69
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/695fdc69
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/695fdc69
Branch: refs/heads/trunk
Commit: 695fdc69db6e080419bb05d624e91fa88d5c0a02
Parents: 74eff8a
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Feb 29 18:52:54 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 29 18:52:54 2016 -0800
----------------------------------------------------------------------
.../scala/kafka/common/MessageFormatter.scala | 39 +++++++++++++
.../main/scala/kafka/common/MessageReader.scala | 39 +++++++++++++
.../coordinator/GroupMetadataManager.scala | 60 +++++++++++---------
.../scala/kafka/tools/ConsoleConsumer.scala | 45 +++++++--------
.../scala/kafka/tools/ConsoleProducer.scala | 40 +++++--------
.../scala/kafka/tools/SimpleConsumerShell.scala | 31 +++++-----
.../unit/kafka/tools/ConsoleConsumerTest.scala | 3 +-
docs/upgrade.html | 11 +++-
8 files changed, 176 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageFormatter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageFormatter.scala b/core/src/main/scala/kafka/common/MessageFormatter.scala
new file mode 100644
index 0000000..ef3c723
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageFormatter.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.io.PrintStream
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+/**
+ * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
+ * a `PrintStream`.
+ *
+ * This is used by the `ConsoleConsumer`.
+ */
+trait MessageFormatter {
+
+ def init(props: Properties) {}
+
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
+
+ def close() {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageReader.scala b/core/src/main/scala/kafka/common/MessageReader.scala
new file mode 100644
index 0000000..56b55ce
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageReader.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.io.InputStream
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.ProducerRecord
+
+/**
+ * Typical implementations of this interface convert data from an `InputStream` received via `init` into a
+ * `ProducerRecord` instance on each invocation of `readMessage`.
+ *
+ * This is used by the `ConsoleProducer`.
+ */
+trait MessageReader {
+
+ def init(inputStream: InputStream, props: Properties) {}
+
+ def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
+
+ def close() {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index b3e1bc1..2c29172 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -22,23 +22,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
import org.apache.kafka.common.protocol.types.Type.STRING
import org.apache.kafka.common.protocol.types.Type.INT32
import org.apache.kafka.common.protocol.types.Type.INT64
import org.apache.kafka.common.protocol.types.Type.BYTES
-import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.Time
-
+import org.apache.kafka.clients.consumer.ConsumerRecord
import kafka.utils._
import kafka.common._
import kafka.message._
import kafka.log.FileMessageSet
import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
-import kafka.tools.MessageFormatter
+import kafka.common.MessageFormatter
import kafka.server.ReplicaManager
import scala.collection._
@@ -968,37 +967,46 @@ object GroupMetadataManager {
// Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
// (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
class OffsetsMessageFormatter extends MessageFormatter {
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
- val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
- // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
- // only print if the message is an offset record
- if (formattedKey.isInstanceOf[OffsetKey]) {
- val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString
- val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
- output.write(groupTopicPartition.getBytes)
- output.write("::".getBytes)
- output.write(formattedValue.getBytes)
- output.write("\n".getBytes)
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+ Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+ // Only print if the message is an offset record.
+ // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+ case offsetKey: OffsetKey =>
+ val groupTopicPartition = offsetKey.key
+ val value = consumerRecord.value
+ val formattedValue =
+ if (value == null) "NULL"
+ else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+ output.write(groupTopicPartition.toString.getBytes)
+ output.write("::".getBytes)
+ output.write(formattedValue.getBytes)
+ output.write("\n".getBytes)
+ case _ => // no-op
}
}
}
// Formatter for use with tools to read group metadata history
class GroupMetadataMessageFormatter extends MessageFormatter {
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
- val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
- // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
- // only print if the message is a group metadata record
- if (formattedKey.isInstanceOf[GroupMetadataKey]) {
- val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key
- val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
- output.write(groupId.getBytes)
- output.write("::".getBytes)
- output.write(formattedValue.getBytes)
- output.write("\n".getBytes)
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+ Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+ // Only print if the message is a group metadata record.
+ // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+ case groupMetadataKey: GroupMetadataKey =>
+ val groupId = groupMetadataKey.key
+ val value = consumerRecord.value
+ val formattedValue =
+ if (value == null) "NULL"
+ else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+ output.write(groupId.getBytes)
+ output.write("::".getBytes)
+ output.write(formattedValue.getBytes)
+ output.write("\n".getBytes)
+ case _ => // no-op
}
}
}
+
}
case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7aee7ab..855025e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -20,20 +20,21 @@ package kafka.tools
import java.io.PrintStream
import java.util.concurrent.CountDownLatch
import java.util.{Properties, Random}
+
import joptsimple._
-import kafka.common.StreamEndException
+import kafka.common.{MessageFormatter, StreamEndException}
import kafka.consumer._
import kafka.message._
import kafka.metrics.KafkaMetricsReporter
import kafka.utils._
-import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer}
+import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Consumer that dumps messages to standard out.
@@ -126,7 +127,8 @@ object ConsoleConsumer extends Logging {
}
messageCount += 1
try {
- formatter.writeTo(msg.key, msg.value, msg.timestamp, msg.timestampType, System.out)
+ formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
+ msg.key, msg.value), System.out)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
@@ -285,7 +287,7 @@ object ConsoleConsumer extends Logging {
val fromBeginning = options.has(resetBeginningOpt)
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
+ val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
@@ -310,9 +312,9 @@ object ConsoleConsumer extends Logging {
}
//Provide the consumer with a randomly assigned group id
- if(!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-" + new Random().nextInt(100000))
- groupIdPassed=false
+ if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
+ groupIdPassed = false
}
def tryParse(parser: OptionParser, args: Array[String]) = {
@@ -336,14 +338,6 @@ object ConsoleConsumer extends Logging {
}
}
-trait MessageFormatter{
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream)
-
- def init(props: Properties) {}
-
- def close() {}
-}
-
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var printTimestamp = false
@@ -370,7 +364,7 @@ class DefaultMessageFormatter extends MessageFormatter {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
}
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
@@ -379,6 +373,8 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(separator)
}
+ import consumerRecord._
+
if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(s"$timestampType:$timestamp".getBytes)
@@ -386,6 +382,7 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(s"NO_TIMESTAMP".getBytes)
output.write(keySeparator)
}
+
if (printKey) write(keyDeserializer, key, keySeparator)
write(valueDeserializer, value, lineSeparator)
}
@@ -395,9 +392,10 @@ class LoggingMessageFormatter extends MessageFormatter {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
val logger = Logger.getLogger(getClass().getName)
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream): Unit = {
- defaultWriter.writeTo(key, value, timestamp, timestampType, output)
- if(logger.isInfoEnabled)
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
+ import consumerRecord._
+ defaultWriter.writeTo(consumerRecord, output)
+ if (logger.isInfoEnabled)
logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
s"key:${if (key == null) "null" else new String(key)}, " +
s"value:${if (value == null) "null" else new String(value)}")
@@ -407,7 +405,7 @@ class LoggingMessageFormatter extends MessageFormatter {
class NoOpMessageFormatter extends MessageFormatter {
override def init(props: Properties) {}
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream){}
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream){}
}
class ChecksumMessageFormatter extends MessageFormatter {
@@ -421,7 +419,8 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}
- def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+ import consumerRecord._
val chksum =
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index bce819e..0116a96 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -20,14 +20,13 @@ package kafka.tools
import kafka.common._
import kafka.message._
import kafka.serializer._
-import kafka.utils.{ToolsUtils, CommandLineUtils}
-import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage}
-
+import kafka.utils.{CommandLineUtils, ToolsUtils}
+import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
import joptsimple._
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.utils.Utils
object ConsoleProducer {
@@ -52,12 +51,12 @@ object ConsoleProducer {
}
})
- var message: KeyedMessage[Array[Byte], Array[Byte]] = null
+ var message: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
message = reader.readMessage()
- if(message != null)
- producer.send(message.topic, message.key, message.message)
- } while(message != null)
+ if (message != null)
+ producer.send(message.topic, message.key, message.value)
+ } while (message != null)
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
@@ -285,12 +284,6 @@ object ConsoleProducer {
val maxBlockMs = options.valueOf(maxBlockMsOpt)
}
- trait MessageReader {
- def init(inputStream: InputStream, props: Properties) {}
- def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
- def close() {}
- }
-
class LineMessageReader extends MessageReader {
var topic: String = null
var reader: BufferedReader = null
@@ -301,11 +294,11 @@ object ConsoleProducer {
override def init(inputStream: InputStream, props: Properties) {
topic = props.getProperty("topic")
- if(props.containsKey("parse.key"))
+ if (props.containsKey("parse.key"))
parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
- if(props.containsKey("key.separator"))
+ if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
- if(props.containsKey("ignore.error"))
+ if (props.containsKey("ignore.error"))
ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}
@@ -317,17 +310,14 @@ object ConsoleProducer {
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 =>
- if(ignoreError)
- new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
- else
- throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+ if (ignoreError) new ProducerRecord(topic, line.getBytes)
+ else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
case n =>
- new KeyedMessage[Array[Byte], Array[Byte]](topic,
- line.substring(0, n).getBytes,
- (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes())
+ val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
+ new ProducerRecord(topic, line.substring(0, n).getBytes, value)
}
case (line, false) =>
- new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
+ new ProducerRecord(topic, line.getBytes)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index dda9697..b4b68e0 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -21,10 +21,12 @@ import joptsimple._
import kafka.utils._
import kafka.consumer._
import kafka.client.ClientUtils
-import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
+import kafka.api.{FetchRequestBuilder, OffsetRequest, Request}
import kafka.cluster.BrokerEndPoint
+
import scala.collection.JavaConversions._
-import kafka.common.TopicAndPartition
+import kafka.common.{MessageFormatter, TopicAndPartition}
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.utils.Utils
/**
@@ -137,7 +139,7 @@ object SimpleConsumerShell extends Logging {
// validating partition id
val partitionsMetadata = topicsMetadata(0).partitionsMetadata
val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
- if(!partitionMetadataOpt.isDefined) {
+ if (!partitionMetadataOpt.isDefined) {
System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
System.exit(1)
}
@@ -145,9 +147,9 @@ object SimpleConsumerShell extends Logging {
// validating replica id and initializing target broker
var fetchTargetBroker: BrokerEndPoint = null
var replicaOpt: Option[BrokerEndPoint] = null
- if(replicaId == UseLeaderReplica) {
+ if (replicaId == UseLeaderReplica) {
replicaOpt = partitionMetadataOpt.get.leader
- if(!replicaOpt.isDefined) {
+ if (!replicaOpt.isDefined) {
System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
System.exit(1)
}
@@ -186,7 +188,7 @@ object SimpleConsumerShell extends Logging {
}
// initializing formatter
- val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+ val formatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
val replicaString = if(replicaId > 0) "leader" else "replica"
@@ -202,7 +204,7 @@ object SimpleConsumerShell extends Logging {
var offset = startingOffset
var numMessagesConsumed = 0
try {
- while(numMessagesConsumed < maxMessages) {
+ while (numMessagesConsumed < maxMessages) {
val fetchRequest = fetchRequestBuilder
.addFetch(topic, partitionId, offset, fetchSize)
.build()
@@ -213,15 +215,16 @@ object SimpleConsumerShell extends Logging {
return
}
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
- for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
+ for (messageAndOffset <- messageSet if numMessagesConsumed < maxMessages) {
try {
offset = messageAndOffset.nextOffset
- if(printOffsets)
+ if (printOffsets)
System.out.println("next offset = " + offset)
val message = messageAndOffset.message
- val key = if(message.hasKey) Utils.readBytes(message.key) else null
+ val key = if (message.hasKey) Utils.readBytes(message.key) else null
val value = if (message.isNull) null else Utils.readBytes(message.payload)
- formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out)
+ formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp,
+ message.timestampType, key, value), System.out)
numMessagesConsumed += 1
} catch {
case e: Throwable =>
@@ -230,7 +233,7 @@ object SimpleConsumerShell extends Logging {
else
throw e
}
- if(System.out.checkError()) {
+ if (System.out.checkError()) {
// This means no one is listening to our output stream any more, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
formatter.close()
@@ -242,8 +245,8 @@ object SimpleConsumerShell extends Logging {
} catch {
case e: Throwable =>
error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
- }finally {
- info("Consumed " + numMessagesConsumed + " messages")
+ } finally {
+ info(s"Consumed $numMessagesConsumed messages")
}
}
}, false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index de92a24..31b3211 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -19,6 +19,7 @@ package kafka.tools
import java.io.FileOutputStream
+import kafka.common.MessageFormatter
import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
import kafka.utils.TestUtils
import org.easymock.EasyMock
@@ -39,7 +40,7 @@ class ConsoleConsumerTest extends JUnitSuite {
//Expectations
val messageLimit: Int = 10
- EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
+ EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)
EasyMock.replay(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3370822..863a6fa 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,9 +65,14 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
<li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li>
<li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
<li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
- <li> MessageFormatter interface was changed from <code>void writeTo(byte[] key, byte[] value, PrintStream output)</code> to
- <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType timestampType, PrintStream output)</code> </li>
- <li> MirrorMakerMessageHandler no longer exposes <em>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</em> method as it was never called. </li>
+ <li> MessageFormatter interface was changed from <code>def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)</code> to
+ <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)</code> </li>
+ <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]</code> to
+ <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code> </li>
+ </li>
+ <li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li>
+ <li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li>
+ <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
</ul>
<h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</a></h4>