You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Furkan KAMACI <fu...@gmail.com> on 2016/11/02 19:13:08 UTC

Kafka Streams Error

I use Kafka 0.10.0.1. I count the messages of a topic as follows:

...
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
...
KStream<String, String> longs = builder.stream(Serdes.String(),
Serdes.String(), "qps-input");
...
KTable<Windowed<String>, Long> longCounts =
                longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
                        Serdes.String());
...

and then I write output to another topic. Result is that:

Numbers which starts from 1 and increase whenever I add something to
qps-input.

My questions:

1) Does it calculate really last hour or everything from the beginning due
you I've set it as earliest?

2) Sometimes it's been reset and numbers starts from 1. What can be the
reason for that?

Kind Regards,
Furkan KAMACI

Re: Kafka Streams Error

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

First a hint about "group.id". Please read this to make sense of this
parameter:

http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups
- -and-offset-management

It might also help to understand how to get the "last value" of a
topic. I also want to mention, that "last value" is a moving point in
general, as new data might be appended at any time. Thus, the
definition of "last value" is not completely sounds.

For my code snipped below, I assume you do have a topic with no
running producers and thus no new data gets appended. For this case,
you need to seek() to "end minus one" offset and afterwards poll() for
the record.

The simplest way might be to use something like this

> Map<TopicPartition, Long> headOffsets =
> consumer.endOffsets(Collection<TopicPartition> partitions) 
> for(Entry<TopicPartition, Long> topicPlusOffset : headOffsets) { 
> consumer.seek(topicPlusOffset.getKey(), topicPlusOffset.getValue()
> - 1); }

Afterwards you can just call poll() and it should return the last
message for each partition (depending on you number of partitions, not
for all partitions in a single call to poll() though)


Hope this helps.

- -Matthias


On 11/3/16 5:26 AM, Furkan KAMACI wrote:
> I've just realised the parameter of poll method. It's been
> explained as:
> 
> "The time, in milliseconds, spent waiting in poll if data is not
> available in the buffer."
> 
> When I set to a big number ''sometimes" I can see a result in it.
> When I set it to 0 and push something to do topic that it listens
> still "sometimes" I can see a result.
> 
> What I want is to get the last value of that topic?
> 
> Kind Regards, Furkan KAMACI
> 
> On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI
> <fu...@gmail.com> wrote:
> 
>> Hi Matthias,
>> 
>> Thanks for the response. I stream output as follows:
>> 
>> longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), 
>> Serdes.Long(), "qps-aggregated");
>> 
>> I want to read last value from that topic at another application.
>> I've tried that:
>> 
>> Properties props = new Properties(); 
>> props.put("bootstrap.servers", "localhost:9092"); 
>> props.put("group.id", "qps-consumer"); *//I'dont know the real 
>> purpose of this setting* props.put("enable.auto.commit",
>> "true"); props.put("auto.commit.interval.ms", "1000"); 
>> props.put("session.timeout.ms", "30000"); 
>> props.put("key.deserializer", "org.apache.kafka.common. 
>> serialization.StringDeserializer"); 
>> props.put("value.deserializer", "org.apache.kafka.common. 
>> serialization.LongDeserializer"); KafkaConsumer<String, String>
>> consumer = new KafkaConsumer<>(props); 
>> consumer.subscribe(Collections.singletonList("qps-aggregated")); 
>> ConsumerRecords<String, String> records = consumer.poll(1); for
>> (ConsumerRecord<String, String> record : records) { 
>> System.out.printf("Connected! offset = %d, key = %s, value = %s",
>> record.offset(), record.key(), record.value()); }
>> 
>> I can see that there is data when I check the streamed topic 
>> (qps-aggregated) from command line. However, I cannot get any
>> result from that subscription via my application. What can be the
>> reason?
>> 
>> Kind Regards, Furkan KAMACI
>> 
>> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax
>> <ma...@confluent.io> wrote:
>> 
> Hi,
> 
> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up
> you application for the first time. If you start it a second time,
> it will resume from where is left off.
> 
> About getting numbers starting from zero: this is expected
> behavior because streams **updates** the window computation each
> time an input record is added to the window. So you see each
> intermediate result.
> 
> Furthermore, each time a new window is created, you will see a "1" 
> again in the output as this is the current count of the new window.
> If you want do distinguish windows in the output, you need to look
> at the key. It encode the original record-key as well as a window
> ID.
> 
> 
> -Matthias
> 
> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
>>>>> I use Kafka 0.10.0.1. I count the messages of a topic as
>>>>> follows:
>>>>> 
>>>>> ... 
>>>>> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>>>>
>>>>> 
"earliest"); ... KStream<String, String> longs =
>>>>> builder.stream(Serdes.String(), Serdes.String(),
>>>>> "qps-input"); ... KTable<Windowed<String>, Long> longCounts
>>>>> = longs.countByKey(TimeWindows.of("qps", 3600 * 1000), 
>>>>> Serdes.String()); ...
>>>>> 
>>>>> and then I write output to another topic. Result is that:
>>>>> 
>>>>> Numbers which starts from 1 and increase whenever I add
>>>>> something to qps-input.
>>>>> 
>>>>> My questions:
>>>>> 
>>>>> 1) Does it calculate really last hour or everything from
>>>>> the beginning due you I've set it as earliest?
>>>>> 
>>>>> 2) Sometimes it's been reset and numbers starts from 1.
>>>>> What can be the reason for that?
>>>>> 
>>>>> Kind Regards, Furkan KAMACI
>>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYG4NTAAoJECnhiMLycopPGNgP/iF95vXFLvzVD585e+8ny7VE
ykhNIGXQ725yY6bp++bK3WJoyhCt5GXKWULoJc+Zvs+3QJUPcG0zfuIvWUr2pZwf
m+1y0wVUK8tCVooIa/Bv2Hhrw3HhEt88518Puvl444zQmBQF+K3YTqHlxjmUMvem
NYXv7CMqkRngmWxdgbjUr+WY0ISwCOZRQxB8NDnmnXbxgXucVpBcqDYfcNIrMUJJ
UalVa1+JKvy10FpMMXjcZuatJ+YdE7ueKeAmIXV/W50ICuCdEj/WouvXPnnOUfcr
mhydq8H5FO566+pf6v80+kn0sJDb9gedUcNBKS89TLZH/IRQjo/u4Go8itsfSFI1
ykVMe6YgLNKuNTW4qqG05TBivV+Mgieyt+0FGLhF60zS9wJjCAoo1o+eSxIn/b1N
ruLpDkIyFKWsO3NhdRxEn6YVoYuo4cc6trwAsxpGMFH92IWc1fY5hGIobm409IIG
0IqTQ3OvmapgDvZh0S90XHC8zzj1nsLtznmJlozUdPBAO0g3N1Fn5BQiIzbrV096
kKn8vT0r3M7izi/gP7Y7ylV6w3AK7SL+O7Ryy3H5tGNtLJ3xgns3vqnRaMc76MrN
9kyR2BDF0stFPPX4WSXmLbveq1kCW68ul9humhNJHZLcO9HqGUrXIh95HFTmW+On
040594zj4yDH/CRiIPbj
=lJDC
-----END PGP SIGNATURE-----

Re: Kafka Streams Error

Posted by Furkan KAMACI <fu...@gmail.com>.
I've just realised the parameter of poll method. It's been explained as:

"The time, in milliseconds, spent waiting in poll if data is not available
in the buffer."

When I set to a big number ''sometimes" I can see a result in it. When I
set it to 0 and push something to do topic that it listens still
"sometimes" I can see a result.

What I want is to get the last value of that topic?

Kind Regards,
Furkan KAMACI

On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI <fu...@gmail.com>
wrote:

> Hi Matthias,
>
> Thanks for the response. I stream output as follows:
>
>         longCounts.toStream((wk, v) -> wk.key())
>                 .to(Serdes.String(),
>                         Serdes.Long(),
>                         "qps-aggregated");
>
> I want to read last value from that topic at another application. I've
> tried that:
>
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092");
>         props.put("group.id", "qps-consumer"); *//I'dont know the real
> purpose of this setting*
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("session.timeout.ms", "30000");
>         props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>         props.put("value.deserializer", "org.apache.kafka.common.
> serialization.LongDeserializer");
>         KafkaConsumer<String, String> consumer = new
> KafkaConsumer<>(props);
>         consumer.subscribe(Collections.singletonList("qps-aggregated"));
>         ConsumerRecords<String, String> records = consumer.poll(1);
>         for (ConsumerRecord<String, String> record : records) {
>             System.out.printf("Connected! offset = %d, key = %s, value =
> %s", record.offset(), record.key(), record.value());
>         }
>
> I can see that there is data when I check the streamed topic
> (qps-aggregated) from command line. However, I cannot get any result from
> that subscription via my application. What can be the reason?
>
> Kind Regards,
> Furkan KAMACI
>
> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> Hi,
>>
>> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you
>> application for the first time. If you start it a second time, it will
>> resume from where is left off.
>>
>> About getting numbers starting from zero: this is expected behavior
>> because streams **updates** the window computation each time an input
>> record is added to the window. So you see each intermediate result.
>>
>> Furthermore, each time a new window is created, you will see a "1"
>> again in the output as this is the current count of the new window. If
>> you want do distinguish windows in the output, you need to look at the
>> key. It encode the original record-key as well as a window ID.
>>
>>
>> - -Matthias
>>
>> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
>> > I use Kafka 0.10.0.1. I count the messages of a topic as follows:
>> >
>> > ...
>> > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> > "earliest"); ... KStream<String, String> longs =
>> > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ...
>> > KTable<Windowed<String>, Long> longCounts =
>> > longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
>> > Serdes.String()); ...
>> >
>> > and then I write output to another topic. Result is that:
>> >
>> > Numbers which starts from 1 and increase whenever I add something
>> > to qps-input.
>> >
>> > My questions:
>> >
>> > 1) Does it calculate really last hour or everything from the
>> > beginning due you I've set it as earliest?
>> >
>> > 2) Sometimes it's been reset and numbers starts from 1. What can be
>> > the reason for that?
>> >
>> > Kind Regards, Furkan KAMACI
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz
>> N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT
>> RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI
>> Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6
>> vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+
>> APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8
>> sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc
>> bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv
>> XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J
>> 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8
>> BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1
>> 6gi7YNmGkeE+jzTf/YC9
>> =Vq3G
>> -----END PGP SIGNATURE-----
>>
>
>

Re: Kafka Streams Error

Posted by Furkan KAMACI <fu...@gmail.com>.
Hi Matthias,

Thanks for the response. I stream output as follows:

        longCounts.toStream((wk, v) -> wk.key())
                .to(Serdes.String(),
                        Serdes.Long(),
                        "qps-aggregated");

I want to read last value from that topic at another application. I've
tried that:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "qps-consumer"); *//I'dont know the real
purpose of this setting*
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
"org.apache.kafka.common.serialization.LongDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("qps-aggregated"));
        ConsumerRecords<String, String> records = consumer.poll(1);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Connected! offset = %d, key = %s, value =
%s", record.offset(), record.key(), record.value());
        }

I can see that there is data when I check the streamed topic
(qps-aggregated) from command line. However, I cannot get any result from
that subscription via my application. What can be the reason?

Kind Regards,
Furkan KAMACI

On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you
> application for the first time. If you start it a second time, it will
> resume from where is left off.
>
> About getting numbers starting from zero: this is expected behavior
> because streams **updates** the window computation each time an input
> record is added to the window. So you see each intermediate result.
>
> Furthermore, each time a new window is created, you will see a "1"
> again in the output as this is the current count of the new window. If
> you want do distinguish windows in the output, you need to look at the
> key. It encode the original record-key as well as a window ID.
>
>
> - -Matthias
>
> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
> > I use Kafka 0.10.0.1. I count the messages of a topic as follows:
> >
> > ...
> > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > "earliest"); ... KStream<String, String> longs =
> > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ...
> > KTable<Windowed<String>, Long> longCounts =
> > longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
> > Serdes.String()); ...
> >
> > and then I write output to another topic. Result is that:
> >
> > Numbers which starts from 1 and increase whenever I add something
> > to qps-input.
> >
> > My questions:
> >
> > 1) Does it calculate really last hour or everything from the
> > beginning due you I've set it as earliest?
> >
> > 2) Sometimes it's been reset and numbers starts from 1. What can be
> > the reason for that?
> >
> > Kind Regards, Furkan KAMACI
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz
> N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT
> RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI
> Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6
> vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+
> APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8
> sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc
> bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv
> XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J
> 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8
> BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1
> 6gi7YNmGkeE+jzTf/YC9
> =Vq3G
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams Error

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you
application for the first time. If you start it a second time, it will
resume from where is left off.

About getting numbers starting from zero: this is expected behavior
because streams **updates** the window computation each time an input
record is added to the window. So you see each intermediate result.

Furthermore, each time a new window is created, you will see a "1"
again in the output as this is the current count of the new window. If
you want do distinguish windows in the output, you need to look at the
key. It encode the original record-key as well as a window ID.


- -Matthias

On 11/2/16 12:13 PM, Furkan KAMACI wrote:
> I use Kafka 0.10.0.1. I count the messages of a topic as follows:
> 
> ... 
> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> "earliest"); ... KStream<String, String> longs =
> builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ... 
> KTable<Windowed<String>, Long> longCounts = 
> longs.countByKey(TimeWindows.of("qps", 3600 * 1000), 
> Serdes.String()); ...
> 
> and then I write output to another topic. Result is that:
> 
> Numbers which starts from 1 and increase whenever I add something
> to qps-input.
> 
> My questions:
> 
> 1) Does it calculate really last hour or everything from the
> beginning due you I've set it as earliest?
> 
> 2) Sometimes it's been reset and numbers starts from 1. What can be
> the reason for that?
> 
> Kind Regards, Furkan KAMACI
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz
N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT
RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI
Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6
vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+
APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8
sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc
bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv
XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J
8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8
BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1
6gi7YNmGkeE+jzTf/YC9
=Vq3G
-----END PGP SIGNATURE-----