You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Andrei <fa...@gmail.com> on 2015/04/07 15:40:17 UTC

What is the data format of Kafka's data nodes in ZooKeeper?

I'm trying to read data from ZooKeeper nodes that was written by different
Kafka components. As a specific example (just one from a bunch), I'm trying
to read current offset for specific group, topic and partition. As far as I
understand, it is stored under the path

    /consumers/data-processing-team/offsets/unloads/35

I'm using `com.101tec.zkclient` to get data. I'm able to walk through node
tree and create new nodes, as well as write and read simple types (e.g.
strings) to them. But when it comes to reading data from Kafka-specific
nodes like this:

    zkClient.readData("/consumers/data-processing-team/offsets/unloads/35")

I'm getting decoding error:

org.I0Itec.zkclient.exception.ZkMarshallingError:
> java.io.StreamCorruptedException: invalid stream header: 32323737
>   at
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
>   at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
>   ... 64 elided
> Caused by: java.io.StreamCorruptedException: invalid stream header:
> 32323737
>   at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
>   at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
>   at
> org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.<init>(TcclAwareObjectIputStream.java:30)
>   at
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
>   ... 69 more


As far as I understand, this is due to specific serialization of nodes. So
I'm wondering, what is this format and how can I read such nodes?

Re: What is the data format of Kafka's data nodes in ZooKeeper?

Posted by Andrei <fa...@gmail.com>.
Thanks a lot, ZkStringSerializer works like a charm!

For those googling for the same question,  here's a gist, which
instantiates ZkClient and sets proper serializer (in case somebody else
finds this question).

[1]: https://gist.github.com/jjkoshy/3842975

On Tue, Apr 7, 2015 at 6:49 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Andrei,
>
> Kafka uses string serialization when writing data to ZK, you can find its
> implementation in kafka.utils.ZKStringSerializer.
>
> Guozhang
>
> On Tue, Apr 7, 2015 at 6:40 AM, Andrei <fa...@gmail.com> wrote:
>
> > I'm trying to read data from ZooKeeper nodes that was written by
> different
> > Kafka components. As a specific example (just one from a bunch), I'm
> trying
> > to read current offset for specific group, topic and partition. As far
> as I
> > understand, it is stored under the path
> >
> >     /consumers/data-processing-team/offsets/unloads/35
> >
> > I'm using `com.101tec.zkclient` to get data. I'm able to walk through
> node
> > tree and create new nodes, as well as write and read simple types (e.g.
> > strings) to them. But when it comes to reading data from Kafka-specific
> > nodes like this:
> >
> >
>  zkClient.readData("/consumers/data-processing-team/offsets/unloads/35")
> >
> > I'm getting decoding error:
> >
> > org.I0Itec.zkclient.exception.ZkMarshallingError:
> > > java.io.StreamCorruptedException: invalid stream header: 32323737
> > >   at
> > >
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
> > >   at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
> > >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
> > >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> > >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> > >   ... 64 elided
> > > Caused by: java.io.StreamCorruptedException: invalid stream header:
> > > 32323737
> > >   at
> > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
> > >   at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> > >   at
> > >
> >
> org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.<init>(TcclAwareObjectIputStream.java:30)
> > >   at
> > >
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
> > >   ... 69 more
> >
> >
> > As far as I understand, this is due to specific serialization of nodes.
> So
> > I'm wondering, what is this format and how can I read such nodes?
> >
>
>
>
> --
> -- Guozhang
>

Re: What is the data format of Kafka's data nodes in ZooKeeper?

Posted by Guozhang Wang <wa...@gmail.com>.
Andrei,

Kafka uses string serialization when writing data to ZK, you can find its
implementation in kafka.utils.ZKStringSerializer.

Guozhang

On Tue, Apr 7, 2015 at 6:40 AM, Andrei <fa...@gmail.com> wrote:

> I'm trying to read data from ZooKeeper nodes that was written by different
> Kafka components. As a specific example (just one from a bunch), I'm trying
> to read current offset for specific group, topic and partition. As far as I
> understand, it is stored under the path
>
>     /consumers/data-processing-team/offsets/unloads/35
>
> I'm using `com.101tec.zkclient` to get data. I'm able to walk through node
> tree and create new nodes, as well as write and read simple types (e.g.
> strings) to them. But when it comes to reading data from Kafka-specific
> nodes like this:
>
>     zkClient.readData("/consumers/data-processing-team/offsets/unloads/35")
>
> I'm getting decoding error:
>
> org.I0Itec.zkclient.exception.ZkMarshallingError:
> > java.io.StreamCorruptedException: invalid stream header: 32323737
> >   at
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
> >   at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> >   ... 64 elided
> > Caused by: java.io.StreamCorruptedException: invalid stream header:
> > 32323737
> >   at
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
> >   at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> >   at
> >
> org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.<init>(TcclAwareObjectIputStream.java:30)
> >   at
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
> >   ... 69 more
>
>
> As far as I understand, this is due to specific serialization of nodes. So
> I'm wondering, what is this format and how can I read such nodes?
>



-- 
-- Guozhang

Re: What is the data format of Kafka's data nodes in ZooKeeper?

Posted by Patrick Dignan <pd...@hubspot.com>.
You need to create the ZKClient with the kafka.utils.ZkStringSerializer as
the serializer.

On Tue, Apr 7, 2015 at 9:40 AM, Andrei <fa...@gmail.com> wrote:

> I'm trying to read data from ZooKeeper nodes that was written by different
> Kafka components. As a specific example (just one from a bunch), I'm trying
> to read current offset for specific group, topic and partition. As far as I
> understand, it is stored under the path
>
>     /consumers/data-processing-team/offsets/unloads/35
>
> I'm using `com.101tec.zkclient` to get data. I'm able to walk through node
> tree and create new nodes, as well as write and read simple types (e.g.
> strings) to them. But when it comes to reading data from Kafka-specific
> nodes like this:
>
>     zkClient.readData("/consumers/data-processing-team/offsets/unloads/35")
>
> I'm getting decoding error:
>
> org.I0Itec.zkclient.exception.ZkMarshallingError:
> > java.io.StreamCorruptedException: invalid stream header: 32323737
> >   at
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
> >   at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> >   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> >   ... 64 elided
> > Caused by: java.io.StreamCorruptedException: invalid stream header:
> > 32323737
> >   at
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
> >   at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> >   at
> >
> org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.<init>(TcclAwareObjectIputStream.java:30)
> >   at
> >
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
> >   ... 69 more
>
>
> As far as I understand, this is due to specific serialization of nodes. So
> I'm wondering, what is this format and how can I read such nodes?
>