You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by David Harris <dh...@avum.com> on 2012/10/05 19:18:53 UTC

StreamCorruptedException when running ZkUtils.getCluster(zKclient);

I’m writing a simple java program to play around with the Kafka API and I’m
running into an issue try to get the Cluster object.  The code is:



ZkClient client = new ZkClient(KafkaProperties.zkConnect, 6000);

Cluster cluster = ZkUtils.getCluster(client);



But I’m getting the following error from the ZkUtils.getCluster(client)
method when I try and run it:

Exception in thread "main"
org.I0Itec.zkclient.exception.ZkMarshallingError:
java.io.StreamCorruptedException: invalid stream header: 3139322E

                at
org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)

                at
org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)

                at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)

                at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)

                at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)

                at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)

                at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162)

                at
kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:204)

                at
kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:203)

                at
scala.collection.Iterator$class.foreach(Iterator.scala:631)

                at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)

                at
scala.collection.IterableLike$class.foreach(IterableLike.scala:79)

                at
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)

                at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:203)

                at kafka.utils.ZkUtils.getCluster(ZkUtils.scala)

                at com.company.ClassName.main(ClassName.java:44)

Caused by: java.io.StreamCorruptedException: invalid stream header: 3139322E

                at
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:783)

                at
java.io.ObjectInputStream.<init>(ObjectInputStream.java:280)

                at
org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)

                ... 15 more



I’m not sure if I’m doing something wrong or misusing the API can anyone
offer me any guidance?


Thanks
David Harris

Re: StreamCorruptedException when running ZkUtils.getCluster(zKclient);

Posted by David Harris <dh...@avum.com>.
Thanks for the response Joel!

I'm not reading or writing any messages from the topics I just trying to
connect to get information regarding the kafka cluster.  So unless I miss
understand I didn't think I needed a serializer, if I do which one do I
need?

The code I'm running is:
Cluster cluster = ZkUtils.getCluster(client);

And thats what is throwing the exception.

I'm running a single node of zookeper and kafka on a CentOS box and if I
run the java code from that CentOS box it does work. I'm getting the
exception while connecting from a windows machine.  Both the CentOS box and
the Windows machine are running Java 6 but they are from different vendors
(One is IBM the other is Sun)  I'm guessing that is the issue but I'm not
sure if changing the JVM that is being used will do the trick or not.  I
will try it tomorrow. Any other ideas that I might try?


David Harris
x6016
901-562-0929 - office
520-248-2193 - cell



On Fri, Oct 5, 2012 at 6:05 PM, Joel Koshy <jj...@gmail.com> wrote:

> Hi David,
>
> The zkclient needs a ZKSerializer: https://gist.github.com/3842975
>
> Thanks,
>
> Joel
>
> On Fri, Oct 5, 2012 at 10:18 AM, David Harris <dh...@avum.com> wrote:
>
>> I’m writing a simple java program to play around with the Kafka API and
>> I’m
>> running into an issue try to get the Cluster object.  The code is:
>>
>>
>>
>> ZkClient client = new ZkClient(KafkaProperties.zkConnect, 6000);
>>
>> Cluster cluster = ZkUtils.getCluster(client);
>>
>>
>>
>> But I’m getting the following error from the ZkUtils.getCluster(client)
>> method when I try and run it:
>>
>> Exception in thread "main"
>> org.I0Itec.zkclient.exception.ZkMarshallingError:
>> java.io.StreamCorruptedException: invalid stream header: 3139322E
>>
>>                 at
>>
>> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
>>
>>                 at
>> org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
>>
>>                 at
>> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
>>
>>                 at
>> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>>
>>                 at
>> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
>>
>>                 at
>> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
>>
>>                 at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162)
>>
>>                 at
>> kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:204)
>>
>>                 at
>> kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:203)
>>
>>                 at
>> scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>
>>                 at
>>
>> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
>>
>>                 at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>
>>                 at
>>
>> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
>>
>>                 at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:203)
>>
>>                 at kafka.utils.ZkUtils.getCluster(ZkUtils.scala)
>>
>>                 at com.company.ClassName.main(ClassName.java:44)
>>
>> Caused by: java.io.StreamCorruptedException: invalid stream header:
>> 3139322E
>>
>>                 at
>> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:783)
>>
>>                 at
>> java.io.ObjectInputStream.<init>(ObjectInputStream.java:280)
>>
>>                 at
>>
>> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
>>
>>                 ... 15 more
>>
>>
>>
>> I’m not sure if I’m doing something wrong or misusing the API can anyone
>> offer me any guidance?
>>
>>
>> Thanks
>> David Harris
>>
>
>

Re: StreamCorruptedException when running ZkUtils.getCluster(zKclient);

Posted by Joel Koshy <jj...@gmail.com>.
Hi David,

The zkclient needs a ZKSerializer: https://gist.github.com/3842975

Thanks,

Joel

On Fri, Oct 5, 2012 at 10:18 AM, David Harris <dh...@avum.com> wrote:

> I’m writing a simple java program to play around with the Kafka API and I’m
> running into an issue try to get the Cluster object.  The code is:
>
>
>
> ZkClient client = new ZkClient(KafkaProperties.zkConnect, 6000);
>
> Cluster cluster = ZkUtils.getCluster(client);
>
>
>
> But I’m getting the following error from the ZkUtils.getCluster(client)
> method when I try and run it:
>
> Exception in thread "main"
> org.I0Itec.zkclient.exception.ZkMarshallingError:
> java.io.StreamCorruptedException: invalid stream header: 3139322E
>
>                 at
>
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
>
>                 at
> org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
>
>                 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
>
>                 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>
>                 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
>
>                 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
>
>                 at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162)
>
>                 at
> kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:204)
>
>                 at
> kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:203)
>
>                 at
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
>                 at
>
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
>
>                 at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>
>                 at
>
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
>
>                 at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:203)
>
>                 at kafka.utils.ZkUtils.getCluster(ZkUtils.scala)
>
>                 at com.company.ClassName.main(ClassName.java:44)
>
> Caused by: java.io.StreamCorruptedException: invalid stream header:
> 3139322E
>
>                 at
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:783)
>
>                 at
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:280)
>
>                 at
>
> org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
>
>                 ... 15 more
>
>
>
> I’m not sure if I’m doing something wrong or misusing the API can anyone
> offer me any guidance?
>
>
> Thanks
> David Harris
>