You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mohanraj Nagasamy <te...@gmail.com> on 2020/07/02 00:29:24 UTC

Feedback: Print schemaId using bin/kafka-dump-log.sh

When I try to dump kafka logs for diagnosing or debugging a problem, It's
handy to see if the kafka log message schemaId or not. If it has got, print
the schemaId.

I'm soliciting feedback as to whether it is worth making this change to
kafka-core codebase.

I'm new to the kafka-community - forgive me if this wasn't part of the
process.

This is the change I made:

```
 core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21
+++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 9e9546a92..a8750ac3d 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -35,6 +35,7 @@ object DumpLogSegments {

   // visible for testing
   private[tools] val RecordIndent = "|"
+  private val MAGIC_BYTE = 0x0

   def main(args: Array[String]): Unit = {
     val opts = new DumpLogSegmentsOptions(args)
@@ -277,8 +278,24 @@ object DumpLogSegments {
               }
             } else if (printContents) {
               val (key, payload) = parser.parse(record)
-              key.foreach(key => print(s" key: $key"))
-              payload.foreach(payload => print(s" payload: $payload"))
+              key.foreach(key => {
+                val keyBuffer = record.key()
+                if (keyBuffer.get() == MAGIC_BYTE) {
+                  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
+                }
+                else {
+                  print(s" key: $key")
+                }
+              })
+
+              payload.foreach(payload => {
+                val valueBuffer = record.value()
+                if (valueBuffer.get() == MAGIC_BYTE) {
+                  print(s" payloadSchemaId: ${valueBuffer.getInt} payload:
$payload")
+                } else {
+                  print(s" payload: $payload")
+                }
+              })
             }
             println()
           }
(END)
```

And this is how the output looks like:

```
$ bin/kafka-dump-log.sh --files
data/kafka/logdir/avro_topic_test-0/00000000000000000000.log
--print-data-log

| offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence:
-1 headerKeys: [] *payloadSchemaId*: 1 *payload*:
TracRowe
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false isControl: false position: 2918 CreateTime: 1593570958044 size: 101
magic: 2 compresscodec: NONE crc: 1913155179 isvalid: true
| offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence:
-1 headerKeys: [] *key*: ... *payloadSchemaId*: 2 *payload*:
.iRKoMVeoVVnTmQEuqwSTHZQ
baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false isControl: false position: 3019 CreateTime: 1593570969001 size: 84
magic: 2 compresscodec: NONE crc: 2188466405 isvalid: true
```

-Mohan

Re: Feedback: Print schemaId using bin/kafka-dump-log.sh

Posted by Mohanraj Nagasamy <te...@gmail.com>.
Thanks Adam for providing the feedback! 

On 7/7/20, 5:05 AM, "Adam Bellemare" <ad...@gmail.com> wrote:

    Hi Mohanraj

    While I see the usefulness of your suggestion, the main issue is that
    you're using the Confluent schema registry's conventions and hardwiring
    them into Kafka core. Given that Confluent's standards are not part of
    Kafka's official standards, I do not think you will get approval to submit
    this code into Kafka core.

    There may be Confluent tools that are available that already do this, or
    perhaps they have their own custom tools available where this may be more
    suitable for submission.

    Adam



    On Mon, Jul 6, 2020 at 11:00 AM Mohanraj Nagasamy <te...@gmail.com>
    wrote:

    > Do anyone have feedback on this? ☺
    >
    > From: Mohanraj Nagasamy <te...@gmail.com>
    > Date: Wednesday, July 1, 2020 at 6:29 PM
    > To: "dev@kafka.apache.org" <de...@kafka.apache.org>
    > Subject: Feedback: Print schemaId using bin/kafka-dump-log.sh
    >
    > When I try to dump kafka logs for diagnosing or debugging a problem, It's
    > handy to see if the kafka log message schemaId or not. If it has got, print
    > the schemaId.
    >
    > I'm soliciting feedback as to whether it is worth making this change to
    > kafka-core codebase.
    >
    > I'm new to the kafka-community - forgive me if this wasn't part of the
    > process.
    >
    > This is the change I made:
    >
    > ```
    >  core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21
    > +++++++++++++++++++--
    >  1 file changed, 19 insertions(+), 2 deletions(-)
    >
    > diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    > b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    > index 9e9546a92..a8750ac3d 100755
    > --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    > +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    > @@ -35,6 +35,7 @@ object DumpLogSegments {
    >
    >    // visible for testing
    >    private[tools] val RecordIndent = "|"
    > +  private val MAGIC_BYTE = 0x0
    >
    >    def main(args: Array[String]): Unit = {
    >      val opts = new DumpLogSegmentsOptions(args)
    > @@ -277,8 +278,24 @@ object DumpLogSegments {
    >                }
    >              } else if (printContents) {
    >                val (key, payload) = parser.parse(record)
    > -              key.foreach(key => print(s" key: $key"))
    > -              payload.foreach(payload => print(s" payload: $payload"))
    > +              key.foreach(key => {
    > +                val keyBuffer = record.key()
    > +                if (keyBuffer.get() == MAGIC_BYTE) {
    > +                  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
    > +                }
    > +                else {
    > +                  print(s" key: $key")
    > +                }
    > +              })
    > +
    > +              payload.foreach(payload => {
    > +                val valueBuffer = record.value()
    > +                if (valueBuffer.get() == MAGIC_BYTE) {
    > +                  print(s" payloadSchemaId: ${valueBuffer.getInt}
    > payload: $payload")
    > +                } else {
    > +                  print(s" payload: $payload")
    > +                }
    > +              })
    >              }
    >              println()
    >            }
    > (END)
    > ```
    >
    > And this is how the output looks like:
    >
    > ```
    > $ bin/kafka-dump-log.sh --files
    > data/kafka/logdir/avro_topic_test-0/00000000000000000000.log
    > --print-data-log
    >
    > | offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence:
    > -1 headerKeys: [] payloadSchemaId: 1 payload:
    > TracRowe
    > baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1
    > producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
    > false isControl: false position: 2918 CreateTime: 1593570958044 size: 101
    > magic: 2 compresscodec: NONE crc: 1913155179 isvalid: true
    > | offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence:
    > -1 headerKeys: [] key: ... payloadSchemaId: 2 payload:
    > .iRKoMVeoVVnTmQEuqwSTHZQ
    > baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1
    > producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
    > false isControl: false position: 3019 CreateTime: 1593570969001 size: 84
    > magic: 2 compresscodec: NONE crc: 2188466405 isvalid: true
    > ```
    >
    > -Mohan
    >

Re: Feedback: Print schemaId using bin/kafka-dump-log.sh

Posted by Adam Bellemare <ad...@gmail.com>.
Hi Mohanraj

While I see the usefulness of your suggestion, the main issue is that
you're using the Confluent schema registry's conventions and hardwiring
them into Kafka core. Given that Confluent's standards are not part of
Kafka's official standards, I do not think you will get approval to submit
this code into Kafka core.

There may be Confluent tools that are available that already do this, or
perhaps they have their own custom tools available where this may be more
suitable for submission.

Adam



On Mon, Jul 6, 2020 at 11:00 AM Mohanraj Nagasamy <te...@gmail.com>
wrote:

> Do anyone have feedback on this? ☺
>
> From: Mohanraj Nagasamy <te...@gmail.com>
> Date: Wednesday, July 1, 2020 at 6:29 PM
> To: "dev@kafka.apache.org" <de...@kafka.apache.org>
> Subject: Feedback: Print schemaId using bin/kafka-dump-log.sh
>
> When I try to dump kafka logs for diagnosing or debugging a problem, It's
> handy to see if the kafka log message schemaId or not. If it has got, print
> the schemaId.
>
> I'm soliciting feedback as to whether it is worth making this change to
> kafka-core codebase.
>
> I'm new to the kafka-community - forgive me if this wasn't part of the
> process.
>
> This is the change I made:
>
> ```
>  core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21
> +++++++++++++++++++--
>  1 file changed, 19 insertions(+), 2 deletions(-)
>
> diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> index 9e9546a92..a8750ac3d 100755
> --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> @@ -35,6 +35,7 @@ object DumpLogSegments {
>
>    // visible for testing
>    private[tools] val RecordIndent = "|"
> +  private val MAGIC_BYTE = 0x0
>
>    def main(args: Array[String]): Unit = {
>      val opts = new DumpLogSegmentsOptions(args)
> @@ -277,8 +278,24 @@ object DumpLogSegments {
>                }
>              } else if (printContents) {
>                val (key, payload) = parser.parse(record)
> -              key.foreach(key => print(s" key: $key"))
> -              payload.foreach(payload => print(s" payload: $payload"))
> +              key.foreach(key => {
> +                val keyBuffer = record.key()
> +                if (keyBuffer.get() == MAGIC_BYTE) {
> +                  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
> +                }
> +                else {
> +                  print(s" key: $key")
> +                }
> +              })
> +
> +              payload.foreach(payload => {
> +                val valueBuffer = record.value()
> +                if (valueBuffer.get() == MAGIC_BYTE) {
> +                  print(s" payloadSchemaId: ${valueBuffer.getInt}
> payload: $payload")
> +                } else {
> +                  print(s" payload: $payload")
> +                }
> +              })
>              }
>              println()
>            }
> (END)
> ```
>
> And this is how the output looks like:
>
> ```
> $ bin/kafka-dump-log.sh --files
> data/kafka/logdir/avro_topic_test-0/00000000000000000000.log
> --print-data-log
>
> | offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence:
> -1 headerKeys: [] payloadSchemaId: 1 payload:
> TracRowe
> baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
> false isControl: false position: 2918 CreateTime: 1593570958044 size: 101
> magic: 2 compresscodec: NONE crc: 1913155179 isvalid: true
> | offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence:
> -1 headerKeys: [] key: ... payloadSchemaId: 2 payload:
> .iRKoMVeoVVnTmQEuqwSTHZQ
> baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
> false isControl: false position: 3019 CreateTime: 1593570969001 size: 84
> magic: 2 compresscodec: NONE crc: 2188466405 isvalid: true
> ```
>
> -Mohan
>

Re: Feedback: Print schemaId using bin/kafka-dump-log.sh

Posted by Mohanraj Nagasamy <te...@gmail.com>.
Do anyone have feedback on this? ☺

From: Mohanraj Nagasamy <te...@gmail.com>
Date: Wednesday, July 1, 2020 at 6:29 PM
To: "dev@kafka.apache.org" <de...@kafka.apache.org>
Subject: Feedback: Print schemaId using bin/kafka-dump-log.sh

When I try to dump kafka logs for diagnosing or debugging a problem, It's handy to see if the kafka log message schemaId or not. If it has got, print the schemaId.

I'm soliciting feedback as to whether it is worth making this change to kafka-core codebase.

I'm new to the kafka-community - forgive me if this wasn't part of the process.

This is the change I made:

```
 core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 9e9546a92..a8750ac3d 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -35,6 +35,7 @@ object DumpLogSegments {

   // visible for testing
   private[tools] val RecordIndent = "|"
+  private val MAGIC_BYTE = 0x0

   def main(args: Array[String]): Unit = {
     val opts = new DumpLogSegmentsOptions(args)
@@ -277,8 +278,24 @@ object DumpLogSegments {
               }
             } else if (printContents) {
               val (key, payload) = parser.parse(record)
-              key.foreach(key => print(s" key: $key"))
-              payload.foreach(payload => print(s" payload: $payload"))
+              key.foreach(key => {
+                val keyBuffer = record.key()
+                if (keyBuffer.get() == MAGIC_BYTE) {
+                  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
+                }
+                else {
+                  print(s" key: $key")
+                }
+              })
+
+              payload.foreach(payload => {
+                val valueBuffer = record.value()
+                if (valueBuffer.get() == MAGIC_BYTE) {
+                  print(s" payloadSchemaId: ${valueBuffer.getInt} payload: $payload")
+                } else {
+                  print(s" payload: $payload")
+                }
+              })
             }
             println()
           }
(END)
```

And this is how the output looks like:

```
$ bin/kafka-dump-log.sh --files data/kafka/logdir/avro_topic_test-0/00000000000000000000.log --print-data-log

| offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence: -1 headerKeys: [] payloadSchemaId: 1 payload:
TracRowe
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 2918 CreateTime: 1593570958044 size: 101 magic: 2 compresscodec: NONE crc: 1913155179 isvalid: true
| offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence: -1 headerKeys: [] key: ... payloadSchemaId: 2 payload: .iRKoMVeoVVnTmQEuqwSTHZQ
baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 3019 CreateTime: 1593570969001 size: 84 magic: 2 compresscodec: NONE crc: 2188466405 isvalid: true
```

-Mohan