You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Job-Selina Wu <sw...@gmail.com> on 2015/07/28 02:54:07 UTC

no new topic created on Kafka

Hi, Dear All:

       I have two Tasks at Samza. HttpDemoParserStreamTask and
HttpDemoStatsStreamTask. They are almost same, except the output topic name
is different and the task name are different at properties file. I am
wondering how should I debug on it?

   More details are list below.

   All your help is highly appreciated.

Sincerely,
Selina

    Currently HttpDemoParserStreamTask run well.
    However HttpDemoStatsStreamTask can generate the log correctly withouot
Exception at
deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
samza-container-0.log

The last record as below is right, however there is no topic "
demo-stats-temp" was created.
--------------------------------------

2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
Galaxy S6","operationSystem":"Android
5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}


-------------------The demo-stats.properties files-----------------------------

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-stats-tmp


 task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
 task.checkpoint.system=kafka
 # Normally, this would be 3, but we have only one broker.
 task.checkpoint.replication.factor=1

 # YARN
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz

 # Task
 task.class=samza.http.demo.task.HttpDemoParserStreamTask
 task.inputs=kafka.http-demo

 # Serializers
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.msg.serde=string

 systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092

 #stream from begining
 #systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
 systems.kafka.http-demo.samza.offset.default=oldest
# all stream from the oldest
 systems.kafka.streams.http-demo.samza.offset.default=oldest
 systems.kafka.streams.http-demo.samza.reset.offset=true



--------------------HttpDemoStatsStreamTask class----------------------------

public class HttpDemoStatsStreamTask implements StreamTask  {

    //output topic
    private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-stats-temp");
    Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);

    @SuppressWarnings("unchecked")
    @Override
    public void process(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator) throws
Exception {


        String key = (String) envelope.getKey();
        String message = envelope.getMessage().toString();
        logger.info("key=" + key + ": message=" + message);

        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
    }
}

-----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 topic--------------

{"Partition 0":0}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}

Re: no new topic created on Kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Yan:

     Thanks for fixing the bug for me.

Sincerely,
Selina

On Tue, Jul 28, 2015 at 12:03 PM, Yan Fang <ya...@gmail.com> wrote:

>  task.class=samza.http.demo.task.HttpDemoParserStreamTask ...
>
> you are not using the StateStream class...
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
> > Hi, Yan
> >
> > I like to correct my previous comment, when I comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> > *the logger is not show at *at samza-container-0.log, but it make sense.
> >
> >
> > Sincerely,
> > Seina
> >
> > On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <sw...@gmail.com>
> > wrote:
> >
> > > Hi, Yan:
> > >
> > >       Thanks a lot for your reply.
> > >      I tried to comment out
> > systems.kafka.http-demo.samza.offset.default=oldest
> > > and then I tried to comment out
> > > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > > systems.kafka.streams.http-demo.samza.reset.offset=true
> > >
> > >  The result is same as before.  1. the checkoutpoint topic was created,
> > 2.
> > > the log created by Logger can be found at /samza-container-0.log. 3. no
> > > exception is at samza-container-0.log.
> > >
> > >    I guess something conflict between HttpDemoParserStreamTask and
> > > HttpDemoStatsStreamTask? Is any resource registered by
> > > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> > recreate
> > > a topic?
> > >
> > > Sincerely,
> > > Selina
> > >
> > > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <ya...@gmail.com>
> wrote:
> > >
> > >> Can you comment out
> > "systems.kafka.http-demo.samza.offset.default=oldest"
> > >> to see how it works? This seems not a correct property.
> > >>
> > >> Thanks,
> > >>
> > >> Fang, Yan
> > >> yanfang724@gmail.com
> > >>
> > >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucareer99@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi, Dear All:
> > >> >
> > >> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> > >> > HttpDemoStatsStreamTask. They are almost same, except the output
> topic
> > >> name
> > >> > is different and the task name are different at properties file. I
> am
> > >> > wondering how should I debug on it?
> > >> >
> > >> >    More details are list below.
> > >> >
> > >> >    All your help is highly appreciated.
> > >> >
> > >> > Sincerely,
> > >> > Selina
> > >> >
> > >> >     Currently HttpDemoParserStreamTask run well.
> > >> >     However HttpDemoStatsStreamTask can generate the log correctly
> > >> withouot
> > >> > Exception at
> > >> >
> > >> >
> > >>
> >
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> > >> > samza-container-0.log
> > >> >
> > >> > The last record as below is right, however there is no topic "
> > >> > demo-stats-temp" was created.
> > >> > --------------------------------------
> > >> >
> > >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> > >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> > >> >
> > >> >
> > >>
> >
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> > >> >
> > >>
> >
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> > >> >
> > >>
> >
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> > >> > Galaxy S6","operationSystem":"Android
> > >> >
> > >> >
> > >>
> >
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> > >> >
> > >> >
> > >> > -------------------The demo-stats.properties
> > >> > files-----------------------------
> > >> >
> > >> > # Job
> > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > >> > job.name=demo-stats-tmp
> > >>
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > >> >  task.checkpoint.system=kafka
> > >> >  # Normally, this would be 3, but we have only one broker.
> > >> >  task.checkpoint.replication.factor=1
> > >> >
> > >> >  # YARN
> > >> >
> > >> >
> > >>
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> > >> >
> > >> >  # Task
> > >> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> > >> >  task.inputs=kafka.http-demo
> > >> >
> > >> >  # Serializers
> > >> >
> > >> >
> > >>
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> > >> >
> > >> >  # Kafka System
> > >> >
> > >> >
> > >>
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > >> >  systems.kafka.samza.msg.serde=string
> > >> >
> > >> >  systems.kafka.samza.key.serde=string
> > >> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > >> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> > >> >
> > >> >  #stream from begining
> > >> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > >> > #http-demo from the oldest
> > >> >  systems.kafka.http-demo.samza.offset.default=oldest
> > >> > # all stream from the oldest
> > >> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> > >> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> > >> >
> > >> >
> > >> >
> > >> > --------------------HttpDemoStatsStreamTask
> > >> > class----------------------------
> > >> >
> > >> > public class HttpDemoStatsStreamTask implements StreamTask  {
> > >> >
> > >> >     //output topic
> > >> >     private static final SystemStream OUTPUT_STREAM = new
> > >> > SystemStream("kafka", "demo-stats-temp");
> > >> >     Logger logger =
> > >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> > >> >
> > >> >     @SuppressWarnings("unchecked")
> > >> >     @Override
> > >> >     public void process(IncomingMessageEnvelope envelope,
> > >> > MessageCollector collector, TaskCoordinator coordinator) throws
> > >> > Exception {
> > >> >
> > >> >
> > >> >         String key = (String) envelope.getKey();
> > >> >         String message = envelope.getMessage().toString();
> > >> >         logger.info("key=" + key + ": message=" + message);
> > >>
> > >> >
> > >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > >> > message));
> > >> >     }
> > >> > }
> > >> >
> > >> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> > >> > topic--------------
> > >> >
> > >> > {"Partition 0":0}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >>
> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> >
> > >>
> > >
> > >
> >
>

Re: no new topic created on Kafka

Posted by Yan Fang <ya...@gmail.com>.
 task.class=samza.http.demo.task.HttpDemoParserStreamTask ...

you are not using the StateStream class...

Fang, Yan
yanfang724@gmail.com

On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi, Yan
>
> I like to correct my previous comment, when I comment out
> systems.kafka.streams.http-demo.samza.offset.default=oldest
> systems.kafka.streams.http-demo.samza.reset.offset=true
>
> *the logger is not show at *at samza-container-0.log, but it make sense.
>
>
> Sincerely,
> Seina
>
> On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
> > Hi, Yan:
> >
> >       Thanks a lot for your reply.
> >      I tried to comment out
> systems.kafka.http-demo.samza.offset.default=oldest
> > and then I tried to comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> >  The result is same as before.  1. the checkoutpoint topic was created,
> 2.
> > the log created by Logger can be found at /samza-container-0.log. 3. no
> > exception is at samza-container-0.log.
> >
> >    I guess something conflict between HttpDemoParserStreamTask and
> > HttpDemoStatsStreamTask? Is any resource registered by
> > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> recreate
> > a topic?
> >
> > Sincerely,
> > Selina
> >
> > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <ya...@gmail.com> wrote:
> >
> >> Can you comment out
> "systems.kafka.http-demo.samza.offset.default=oldest"
> >> to see how it works? This seems not a correct property.
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> yanfang724@gmail.com
> >>
> >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <sw...@gmail.com>
> >> wrote:
> >>
> >> > Hi, Dear All:
> >> >
> >> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> >> > HttpDemoStatsStreamTask. They are almost same, except the output topic
> >> name
> >> > is different and the task name are different at properties file. I am
> >> > wondering how should I debug on it?
> >> >
> >> >    More details are list below.
> >> >
> >> >    All your help is highly appreciated.
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >> >     Currently HttpDemoParserStreamTask run well.
> >> >     However HttpDemoStatsStreamTask can generate the log correctly
> >> withouot
> >> > Exception at
> >> >
> >> >
> >>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> >> > samza-container-0.log
> >> >
> >> > The last record as below is right, however there is no topic "
> >> > demo-stats-temp" was created.
> >> > --------------------------------------
> >> >
> >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> >> >
> >> >
> >>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> >> >
> >>
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> >> >
> >>
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> >> > Galaxy S6","operationSystem":"Android
> >> >
> >> >
> >>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> >> >
> >> >
> >> > -------------------The demo-stats.properties
> >> > files-----------------------------
> >> >
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=demo-stats-tmp
> >>
> >> >
> >> >
> >> >
> >> >
> >>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >> >  task.checkpoint.system=kafka
> >> >  # Normally, this would be 3, but we have only one broker.
> >> >  task.checkpoint.replication.factor=1
> >> >
> >> >  # YARN
> >> >
> >> >
> >>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >> >
> >> >  # Task
> >> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >> >  task.inputs=kafka.http-demo
> >> >
> >> >  # Serializers
> >> >
> >> >
> >>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >> >
> >> >  # Kafka System
> >> >
> >> >
> >>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> >  systems.kafka.samza.msg.serde=string
> >> >
> >> >  systems.kafka.samza.key.serde=string
> >> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >> >
> >> >  #stream from begining
> >> >  #systems.kafka.consumer.auto.offset.reset=smallest
> >> > #http-demo from the oldest
> >> >  systems.kafka.http-demo.samza.offset.default=oldest
> >> > # all stream from the oldest
> >> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> >> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> >> >
> >> >
> >> >
> >> > --------------------HttpDemoStatsStreamTask
> >> > class----------------------------
> >> >
> >> > public class HttpDemoStatsStreamTask implements StreamTask  {
> >> >
> >> >     //output topic
> >> >     private static final SystemStream OUTPUT_STREAM = new
> >> > SystemStream("kafka", "demo-stats-temp");
> >> >     Logger logger =
> >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> >> >
> >> >     @SuppressWarnings("unchecked")
> >> >     @Override
> >> >     public void process(IncomingMessageEnvelope envelope,
> >> > MessageCollector collector, TaskCoordinator coordinator) throws
> >> > Exception {
> >> >
> >> >
> >> >         String key = (String) envelope.getKey();
> >> >         String message = envelope.getMessage().toString();
> >> >         logger.info("key=" + key + ": message=" + message);
> >>
> >> >
> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > message));
> >> >     }
> >> > }
> >> >
> >> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> >> > topic--------------
> >> >
> >> > {"Partition 0":0}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> >
> >>
> >
> >
>

Re: no new topic created on Kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Yan

I like to correct my previous comment, when I comment out
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true

*the logger is not show at *at samza-container-0.log, but it make sense.


Sincerely,
Seina

On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi, Yan:
>
>       Thanks a lot for your reply.
>      I tried to comment out systems.kafka.http-demo.samza.offset.default=oldest
> and then I tried to comment out
> systems.kafka.streams.http-demo.samza.offset.default=oldest
> systems.kafka.streams.http-demo.samza.reset.offset=true
>
>  The result is same as before.  1. the checkoutpoint topic was created, 2.
> the log created by Logger can be found at /samza-container-0.log. 3. no
> exception is at samza-container-0.log.
>
>    I guess something conflict between HttpDemoParserStreamTask and
> HttpDemoStatsStreamTask? Is any resource registered by
> HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate
> a topic?
>
> Sincerely,
> Selina
>
> On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <ya...@gmail.com> wrote:
>
>> Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
>> to see how it works? This seems not a correct property.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang724@gmail.com
>>
>> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <sw...@gmail.com>
>> wrote:
>>
>> > Hi, Dear All:
>> >
>> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
>> > HttpDemoStatsStreamTask. They are almost same, except the output topic
>> name
>> > is different and the task name are different at properties file. I am
>> > wondering how should I debug on it?
>> >
>> >    More details are list below.
>> >
>> >    All your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >     Currently HttpDemoParserStreamTask run well.
>> >     However HttpDemoStatsStreamTask can generate the log correctly
>> withouot
>> > Exception at
>> >
>> >
>> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
>> > samza-container-0.log
>> >
>> > The last record as below is right, however there is no topic "
>> > demo-stats-temp" was created.
>> > --------------------------------------
>> >
>> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
>> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
>> >
>> >
>> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
>> >
>> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
>> >
>> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
>> > Galaxy S6","operationSystem":"Android
>> >
>> >
>> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
>> >
>> >
>> > -------------------The demo-stats.properties
>> > files-----------------------------
>> >
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=demo-stats-tmp
>>
>> >
>> >
>> >
>> >
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> >  task.checkpoint.system=kafka
>> >  # Normally, this would be 3, but we have only one broker.
>> >  task.checkpoint.replication.factor=1
>> >
>> >  # YARN
>> >
>> >
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >
>> >  # Task
>> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> >  task.inputs=kafka.http-demo
>> >
>> >  # Serializers
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> >  # Kafka System
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> >  systems.kafka.samza.msg.serde=string
>> >
>> >  systems.kafka.samza.key.serde=string
>> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> >  systems.kafka.producer.bootstrap.servers=localhost:9092
>> >
>> >  #stream from begining
>> >  #systems.kafka.consumer.auto.offset.reset=smallest
>> > #http-demo from the oldest
>> >  systems.kafka.http-demo.samza.offset.default=oldest
>> > # all stream from the oldest
>> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
>> >  systems.kafka.streams.http-demo.samza.reset.offset=true
>> >
>> >
>> >
>> > --------------------HttpDemoStatsStreamTask
>> > class----------------------------
>> >
>> > public class HttpDemoStatsStreamTask implements StreamTask  {
>> >
>> >     //output topic
>> >     private static final SystemStream OUTPUT_STREAM = new
>> > SystemStream("kafka", "demo-stats-temp");
>> >     Logger logger =
>> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
>> >
>> >     @SuppressWarnings("unchecked")
>> >     @Override
>> >     public void process(IncomingMessageEnvelope envelope,
>> > MessageCollector collector, TaskCoordinator coordinator) throws
>> > Exception {
>> >
>> >
>> >         String key = (String) envelope.getKey();
>> >         String message = envelope.getMessage().toString();
>> >         logger.info("key=" + key + ": message=" + message);
>>
>> >
>> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > message));
>> >     }
>> > }
>> >
>> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
>> > topic--------------
>> >
>> > {"Partition 0":0}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> >
>>
>
>

Re: no new topic created on Kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Yan:

      Thanks a lot for your reply.
     I tried to comment out systems.kafka.http-demo.samza.offset.default=oldest
and then I tried to comment out
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true

 The result is same as before.  1. the checkoutpoint topic was created, 2.
the log created by Logger can be found at /samza-container-0.log. 3. no
exception is at samza-container-0.log.

   I guess something conflict between HttpDemoParserStreamTask and
HttpDemoStatsStreamTask? Is any resource registered by
HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate
a topic?

Sincerely,
Selina

On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <ya...@gmail.com> wrote:

> Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
> to see how it works? This seems not a correct property.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
> > Hi, Dear All:
> >
> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> > HttpDemoStatsStreamTask. They are almost same, except the output topic
> name
> > is different and the task name are different at properties file. I am
> > wondering how should I debug on it?
> >
> >    More details are list below.
> >
> >    All your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >     Currently HttpDemoParserStreamTask run well.
> >     However HttpDemoStatsStreamTask can generate the log correctly
> withouot
> > Exception at
> >
> >
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> > samza-container-0.log
> >
> > The last record as below is right, however there is no topic "
> > demo-stats-temp" was created.
> > --------------------------------------
> >
> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> >
> >
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> >
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> >
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> > Galaxy S6","operationSystem":"Android
> >
> >
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> >
> >
> > -------------------The demo-stats.properties
> > files-----------------------------
> >
> > # Job
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=demo-stats-tmp
> >
> >
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >  task.checkpoint.system=kafka
> >  # Normally, this would be 3, but we have only one broker.
> >  task.checkpoint.replication.factor=1
> >
> >  # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >
> >  # Task
> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >  task.inputs=kafka.http-demo
> >
> >  # Serializers
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> >  # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >  systems.kafka.samza.msg.serde=string
> >
> >  systems.kafka.samza.key.serde=string
> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >
> >  #stream from begining
> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > #http-demo from the oldest
> >  systems.kafka.http-demo.samza.offset.default=oldest
> > # all stream from the oldest
> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> >
> >
> > --------------------HttpDemoStatsStreamTask
> > class----------------------------
> >
> > public class HttpDemoStatsStreamTask implements StreamTask  {
> >
> >     //output topic
> >     private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "demo-stats-temp");
> >     Logger logger =
> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> >
> >     @SuppressWarnings("unchecked")
> >     @Override
> >     public void process(IncomingMessageEnvelope envelope,
> > MessageCollector collector, TaskCoordinator coordinator) throws
> > Exception {
> >
> >
> >         String key = (String) envelope.getKey();
> >         String message = envelope.getMessage().toString();
> >         logger.info("key=" + key + ": message=" + message);
> >
> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > message));
> >     }
> > }
> >
> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> > topic--------------
> >
> > {"Partition 0":0}
> > {"SystemStreamPartition [kafka, http-demo,
> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >
>

Re: no new topic created on Kafka

Posted by Yan Fang <ya...@gmail.com>.
Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
to see how it works? This seems not a correct property.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi, Dear All:
>
>        I have two Tasks at Samza. HttpDemoParserStreamTask and
> HttpDemoStatsStreamTask. They are almost same, except the output topic name
> is different and the task name are different at properties file. I am
> wondering how should I debug on it?
>
>    More details are list below.
>
>    All your help is highly appreciated.
>
> Sincerely,
> Selina
>
>     Currently HttpDemoParserStreamTask run well.
>     However HttpDemoStatsStreamTask can generate the log correctly withouot
> Exception at
>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> samza-container-0.log
>
> The last record as below is right, however there is no topic "
> demo-stats-temp" was created.
> --------------------------------------
>
> 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> Galaxy S6","operationSystem":"Android
>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
>
>
> -------------------The demo-stats.properties
> files-----------------------------
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-stats-tmp
>
>
>
>  task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>  task.checkpoint.system=kafka
>  # Normally, this would be 3, but we have only one broker.
>  task.checkpoint.replication.factor=1
>
>  # YARN
>
>  yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
>  # Task
>  task.class=samza.http.demo.task.HttpDemoParserStreamTask
>  task.inputs=kafka.http-demo
>
>  # Serializers
>
>  serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
>  # Kafka System
>
>  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>  systems.kafka.samza.msg.serde=string
>
>  systems.kafka.samza.key.serde=string
>  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>  systems.kafka.producer.bootstrap.servers=localhost:9092
>
>  #stream from begining
>  #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
>  systems.kafka.http-demo.samza.offset.default=oldest
> # all stream from the oldest
>  systems.kafka.streams.http-demo.samza.offset.default=oldest
>  systems.kafka.streams.http-demo.samza.reset.offset=true
>
>
>
> --------------------HttpDemoStatsStreamTask
> class----------------------------
>
> public class HttpDemoStatsStreamTask implements StreamTask  {
>
>     //output topic
>     private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-stats-temp");
>     Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
>
>     @SuppressWarnings("unchecked")
>     @Override
>     public void process(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator) throws
> Exception {
>
>
>         String key = (String) envelope.getKey();
>         String message = envelope.getMessage().toString();
>         logger.info("key=" + key + ": message=" + message);
>
>         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
>     }
> }
>
> -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> topic--------------
>
> {"Partition 0":0}
> {"SystemStreamPartition [kafka, http-demo,
> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>