You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "hsy541@gmail.com" <hs...@gmail.com> on 2013/10/14 21:05:10 UTC

KafkaStream bug?

I found some weird behavior,
I follow the exact code example for HighlevelConsumer

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#

but add one debug line here
"
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()){

* ////////////////////////////////////////////my line
here//////////////////////////////////////////////////*
* System.out.println("from the stream" + m_stream); \\This line will be
blocked. KafkaStream.toString() is a blocking method?????*
* /////////////////////////////////////////// end of my line
///////////////////////////////////////////////////////*
*
*
            System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));
 }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
"

Re: KafkaStream bug?

Posted by Joel Koshy <jj...@gmail.com>.
This is probably because KafkaStream is a scala iterable - toString on
an iterable. Per the scala-doc: "returns a string representation of
this collection. By default this string consists of the stringPrefix
of this immutable iterable collection, followed by all elements
separated by commas and enclosed in parentheses."

Joel


On Mon, Oct 14, 2013 at 12:23 PM, Bruno D. Rodrigues
<br...@litux.org> wrote:
> Yes it's kind of blocking. It basically tries to consume every message and create a representation of it, kind of like a List.toString(). Why would that make any sense, I have no idea, just mentioning that I did do the same mistake trying to get something out of the toString() for the connection, and getting scared when I stepped inside the code mostly because I have not much knowledge about scala. And yes it did confuse me to see Eclipse reporting the toString() as a non-overriden version from Object, but then stepping in via debugger it does enter the kafka code and shows non-java code that tries that "consume them all". I may be completely wrong though.
>
>
> A 14/10/2013, às 20:05, "hsy541@gmail.com" <hs...@gmail.com> escreveu:
>
>> I found some weird behavior,
>> I follow the exact code example for HighlevelConsumer
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#
>>
>> but add one debug line here
>> "
>>    public void run() {
>>        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>>        while (it.hasNext()){
>>
>> * ////////////////////////////////////////////my line
>> here//////////////////////////////////////////////////*
>> * System.out.println("from the stream" + m_stream); \\This line will be
>> blocked. KafkaStream.toString() is a blocking method?????*
>> * /////////////////////////////////////////// end of my line
>> ///////////////////////////////////////////////////////*
>> *
>> *
>>            System.out.println("Thread " + m_threadNumber + ": " + new
>> String(it.next().message()));
>> }
>>        System.out.println("Shutting down Thread: " + m_threadNumber);
>>    }
>> "
>

Re: KafkaStream bug?

Posted by "Bruno D. Rodrigues" <br...@litux.org>.
Yes it's kind of blocking. It basically tries to consume every message and create a representation of it, kind of like a List.toString(). Why would that make any sense, I have no idea, just mentioning that I did do the same mistake trying to get something out of the toString() for the connection, and getting scared when I stepped inside the code mostly because I have not much knowledge about scala. And yes it did confuse me to see Eclipse reporting the toString() as a non-overriden version from Object, but then stepping in via debugger it does enter the kafka code and shows non-java code that tries that "consume them all". I may be completely wrong though. 


A 14/10/2013, às 20:05, "hsy541@gmail.com" <hs...@gmail.com> escreveu:

> I found some weird behavior,
> I follow the exact code example for HighlevelConsumer
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#
> 
> but add one debug line here
> "
>    public void run() {
>        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>        while (it.hasNext()){
> 
> * ////////////////////////////////////////////my line
> here//////////////////////////////////////////////////*
> * System.out.println("from the stream" + m_stream); \\This line will be
> blocked. KafkaStream.toString() is a blocking method?????*
> * /////////////////////////////////////////// end of my line
> ///////////////////////////////////////////////////////*
> *
> *
>            System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
> }
>        System.out.println("Shutting down Thread: " + m_threadNumber);
>    }
> "