You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Madabhattula Rajesh Kumar <mr...@gmail.com> on 2014/11/29 18:00:36 UTC

Need help on kafka-storm

Hello,

I'm new to Storm and Kafka. I have tried Strom-Kafka integration example
program. Now I'm able to send message from Kafka and receive those messages
in storm topology.

I have observed one thing in storm topology, same messages are processing
continuously

*I have sent three messages (First Message, Second Message, Third Message
). These 3 messages processing continuously, please find below console log
file*


*Could you please help me on below query*

   -
*How to make sure that storm topology process messages one time
   successfully(Not multiple times). *
   - *What configurations I need to do *

*Below is my code :*











* BrokerHosts zk = new ZkHosts("localhost:2181");        SpoutConfig
spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm",
"discovery");        spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());        KafkaSpout spout = new KafkaSpout(spoutConf);
    TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);        builder.setBolt("printerbolt",
new PrintBolt()) .shuffleGrouping("spout");        Config config = new
Config();        config.setDebug(true);        LocalCluster cluster = new
LocalCluster();        cluster.submitTopology("kafka", config,
builder.createTopology());*

*Log file :*

25526 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
25528 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-5148901491748001310=-1334200518948214946}, [First Message]
*message [First Message]*
25538 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-5148901491748001310 -1334200518948214946 3]
25539 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-5148901491748001310 -1334200518948214946 3]
33530 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
33531 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-8623931148894813393=4843611232629293066}, [Second Message]
*message [Second Message]*
33531 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-8623931148894813393 4843611232629293066 3]
33532 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-8623931148894813393 4843611232629293066 3]
38532 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
38536 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-7749553958395790620=-1739211867328620785}, [Thrid Message]
*message [Thrid Message]*
38537 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-7749553958395790620 -1739211867328620785 3]
38537 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-7749553958395790620 -1739211867328620785 3]
46201 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
76155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
76159 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
printerbolt __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=4,
read_pos=3, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]>]]
76202 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
76206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
76206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{956790162864404846=494501721511970112}, [First Message]
*message [First Message]*
76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [956790162864404846 494501721511970112 3]
76207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[956790162864404846 494501721511970112 3]
76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
76208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-5947127688111528870=-2474569870953878080}, [Second Message]
*message [Second Message]*
76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-5947127688111528870 -2474569870953878080 3]
76208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-5947127688111528870 -2474569870953878080 3]
76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
76209 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{4790700513589938438=-2542940781190231591}, [Thrid Message]
*message [Thrid Message]*
76209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [4790700513589938438 -2542940781190231591 3]
76209 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[4790700513589938438 -2542940781190231591 3]
76276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
76277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
__system __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881> [#<DataPoint
[__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=39294328,
usedBytes=23489160, maxBytes=1003487232, initBytes=64761920,
virtualFreeBytes=979998072, committedBytes=62783488}]> #<DataPoint
[__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]>
#<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
#<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue =
{write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint
[memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint
[__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]>
#<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
[__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
76364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
76365 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
__acker __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=7,
read_pos=6, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{spout:__ack_init=0.0}]> #<DataPoint [__fail-count = {}]> #<DataPoint
[__emit-count = {}]> #<DataPoint [__execute-count =
{spout:__ack_init=20}]>]]
76377 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
76381 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
Kafka Partition partition_1
76382 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11, read_pos=11,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
population=1}]> #<DataPoint [kafkaPartition =
{Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPICallCount=28451,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPICallCount=28452,
Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPILatencyMean=0.05672711935892029,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
[__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
#<DataPoint [__emit-count = {}]>]]
76943 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
76949 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
1=rajesh-VirtualBox:9092}}
76949 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=rajesh-VirtualBox:9092, partition=0},
Partition{host=rajesh-VirtualBox:9092, partition=1}]
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Deleted partition managers: []
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: []
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing
106203 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
136154 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
printerbolt __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=8,
read_pos=7, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]>]]
136204 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
136206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{3336041025082572443=4848943651836321291}, [First Message]
*message [First Message]*
136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [3336041025082572443 4848943651836321291 3]
136207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[3336041025082572443 4848943651836321291 3]
136207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
136208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{8818700006514275130=7403177023020018790}, [Second Message]
*message [Second Message]*
136208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [8818700006514275130 7403177023020018790 3]
136208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[8818700006514275130 7403177023020018790 3]
136209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
136211 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{7897209966477580404=-5223890645152565221}, [Thrid Message]
*message [Thrid Message]*
136211 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [7897209966477580404 -5223890645152565221 3]
136211 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[7897209966477580404 -5223890645152565221 3]
136276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
__system __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00> [#<DataPoint
[__ack-count = {}]> #<DataPoint [GC/Copy = {count=26, timeMs=88}]>
#<DataPoint [memory/heap = {unusedBytes=35718120, usedBytes=27065368,
maxBytes=1003487232, initBytes=64761920, virtualFreeBytes=976421864,
committedBytes=62783488}]> #<DataPoint [__receive = {write_pos=2,
read_pos=1, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]>
#<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]>
#<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>
#<DataPoint [GC/MarkSweepCompact = {count=0, timeMs=0}]> #<DataPoint
[__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
#<DataPoint [memory/nonHeap = {unusedBytes=71848, usedBytes=41609048,
maxBytes=224395264, initBytes=24313856, virtualFreeBytes=182786216,
committedBytes=41680896}]> #<DataPoint [uptimeSecs = 136.665]> #<DataPoint
[__transfer = {write_pos=18, read_pos=18, capacity=1024, population=0}]>
#<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
[__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
136364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136364 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
__acker __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=11,
read_pos=10, capacity=1024, population=1}]> #<DataPoint [__process-latency
= {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency
= {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {spout:__ack_init=0}]>]]
136379 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
136382 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
Kafka Partition partition_1
136383 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17, read_pos=17,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024,
population=1}]> #<DataPoint [kafkaPartition =
{Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPICallCount=30606,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPICallCount=30606,
Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPILatencyMean=0.022773312422400837,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
[__transfer-count = {default=20}]> #<DataPoint [__fail-count =
{default=0}]> #<DataPoint [__emit-count = {default=20}]>]]
136950 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
136954 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
1=rajesh-VirtualBox:9092}}
136954 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=rajesh-VirtualBox:9092, partition=0},
Partition{host=rajesh-VirtualBox:9092, partition=1}]
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Deleted partition managers: []
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: []
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing

Regards,
Rajesh

Re: Need help on kafka-storm

Posted by Madabhattula Rajesh Kumar <mr...@gmail.com>.
Thank you very much Harsha

Regards,
Rajesh

On Tue, Dec 2, 2014 at 8:50 AM, Harsha <st...@harsha.io> wrote:

>  Ok from the earlier logs it looks like your tuples are being timed out
> and getting replayed.
> In your PrintBolt.execute do collector.ack(tuple)
> public class PrintBolt extends BaseRichBolt
> {
>     private static final long serialVersionUID = 1L;
>     private OutputCollector collector;
>
>     public void execute(Tuple tuple)
>     {
>          System.out.println("message " + tuple.getValues());
>          collector.ack(tuple);
>     }
>    public void prepare(Map arg0, TopologyContext arg1, OutputCollector
> arg2) {
>     this.collector = arg2;
>    }
>   public void declareOutputFields(OutputFieldsDeclarer arg0) {
>   }
> }
>
>
> On Mon, Dec 1, 2014, at 07:10 PM, Madabhattula Rajesh Kumar wrote:
>
> Thank you Harsha for your response.
>
> I'm just printing the messages in printer bolt.
>
> Please find below printer blot code
>
> *public class PrintBolt extends BaseRichBolt   *
> *{*
> *    private static final long serialVersionUID = 1L;*
> *    public void execute(Tuple tuple) *
> *    {*
> *         System.out.println("message " + tuple.getValues());*
> *    }*
> *   public void prepare(Map arg0, TopologyContext arg1, OutputCollector
> arg2) {*
> *   }*
> *  public void declareOutputFields(OutputFieldsDeclarer arg0) {*
> *  }*
> *}*
>
>  Regards,
> Rajesh
>
>
> On Tue, Dec 2, 2014 at 8:19 AM, Harsha <st...@harsha.io> wrote:
>
>
> Does your printer bolt ack the messages it received from KafkaSpout.
>
>
> On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote:
>
> Hello,
>
> Could any one help me on above mail query?
>
> Regards,
> Rajesh
>
> On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar <
> mrajaforu@gmail.com> wrote:
>
> Hello,
>
> I'm new to Storm and Kafka. I have tried Strom-Kafka integration example
> program. Now I'm able to send message from Kafka and receive those messages
> in storm topology.
>
>  I have observed one thing in storm topology, same messages are
> processing continuously
>
>  *I have sent three messages (First Message, Second Message, Third
> Message ). These 3 messages processing continuously, please find below
> console log file*
>
> *Could you please help me on below query*
>
>    - *How to make sure that storm topology process messages one time
>    successfully(Not multiple times). *
>    - *What configurations I need to do *
>
> *Below is my code :*
>
> * BrokerHosts zk = new ZkHosts("localhost:2181");*
> *        SpoutConfig spoutConf = new SpoutConfig(zk, "test-topic",
> "/kafkastorm", "discovery");*
> *        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());*
> *        KafkaSpout spout = new KafkaSpout(spoutConf);*
> *        TopologyBuilder builder = new TopologyBuilder();*
> *        builder.setSpout("spout", spout, 1);*
> *        builder.setBolt("printerbolt", new PrintBolt())
> .shuffleGrouping("spout");*
> *        Config config = new Config();*
> *        config.setDebug(true);*
> *        LocalCluster cluster = new LocalCluster();*
> *        cluster.submitTopology("kafka", config,
> builder.createTopology());*
>
>  *Log file :*
>
> 25526 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 25528 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5148901491748001310=-1334200518948214946}, [First Message]
> *message [First Message]*
> 25538 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5148901491748001310 -1334200518948214946 3]
> 25539 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5148901491748001310 -1334200518948214946 3]
> 33530 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 33531 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-8623931148894813393=4843611232629293066}, [Second Message]
> *message [Second Message]*
> 33531 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-8623931148894813393 4843611232629293066 3]
> 33532 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-8623931148894813393 4843611232629293066 3]
> 38532 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 38536 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-7749553958395790620=-1739211867328620785}, [Thrid Message]
> *message [Thrid Message]*
> 38537 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-7749553958395790620 -1739211867328620785 3]
> 38537 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-7749553958395790620 -1739211867328620785 3]
> 46201 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76159 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=4, read_pos=3, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 76202 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 76206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {956790162864404846=494501721511970112}, [First Message]
> *message [First Message]*
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [956790162864404846 494501721511970112 3]
> 76207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [956790162864404846 494501721511970112 3]
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 76208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5947127688111528870=-2474569870953878080}, [Second Message]
> *message [Second Message]*
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 76209 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {4790700513589938438=-2542940781190231591}, [Thrid Message]
> *message [Thrid Message]*
> 76209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [4790700513589938438 -2542940781190231591 3]
> 76209 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [4790700513589938438 -2542940781190231591 3]
> 76276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap =
> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232,
> initBytes=64761920, virtualFreeBytes=979998072, committedBytes=62783488}]>
> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024,
> population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint
> [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
> #<DataPoint [memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
> maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
> committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint
> [__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]>
> #<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
> 76364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76365 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=7, read_pos=6, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {spout:__ack_init=0.0}]> #<DataPoint [__fail-count =
> {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count =
> {spout:__ack_init=20}]>]]
> 76377 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 76381 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 76382 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11,
> read_pos=11, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=28451,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=28452,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.05672711935892029,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
> #<DataPoint [__emit-count = {}]>]]
> 76943 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 76949 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 76949 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
> [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
> 106203 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136154 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=8, read_pos=7, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 136204 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [First Message]
> 136206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {3336041025082572443=4848943651836321291}, [First Message]
> *message [First Message]*
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Second Message]
> 136208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {8818700006514275130=7403177023020018790}, [Second Message]
> *message [Second Message]*
> 136208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [8818700006514275130 7403177023020018790 3]
> 136208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [8818700006514275130 7403177023020018790 3]
> 136209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Thrid Message]
> 136211 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {7897209966477580404=-5223890645152565221}, [Thrid Message]
> *message [Thrid Message]*
> 136211 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [7897209966477580404 -5223890645152565221 3]
> 136211 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [7897209966477580404 -5223890645152565221 3]
> 136276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26,
> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120,
> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920,
> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint
> [__receive = {write_pos=2, read_pos=1, capacity=1024, population=1}]>
> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]>
> #<DataPoint [__execute-count = {}]> #<DataPoint [GC/MarkSweepCompact =
> {count=0, timeMs=0}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap =
> {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264,
> initBytes=24313856, virtualFreeBytes=182786216, committedBytes=41680896}]>
> #<DataPoint [uptimeSecs = 136.665]> #<DataPoint [__transfer =
> {write_pos=18, read_pos=18, capacity=1024, population=0}]> #<DataPoint
> [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency = {}]>
> #<DataPoint [__transfer-count = {}]>]]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]]
> 136379 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 136382 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 136383 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17,
> read_pos=17, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.022773312422400837,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
> [__transfer-count = {default=20}]> #<DataPoint [__fail-count =
> {default=0}]> #<DataPoint [__emit-count = {default=20}]>]]
> 136950 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 136954 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 136954 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1]
> assigned [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
>
> Regards,
> Rajesh
>
>
>
>
>
>
>
>
>

Re: Need help on kafka-storm

Posted by Harsha <st...@harsha.io>.
Ok from the earlier logs it looks like your tuples are being timed out
and getting replayed. In your PrintBolt.execute do collector.ack(tuple)
public class PrintBolt extends BaseRichBolt { private static final long
serialVersionUID = 1L; private OutputCollector collector;

public void execute(Tuple tuple) { System.out.println("message " +
tuple.getValues()); collector.ack(tuple); } public void prepare(Map
arg0, TopologyContext arg1, OutputCollector arg2) { this.collector =
arg2; } public void declareOutputFields(OutputFieldsDeclarer arg0) { } }


On Mon, Dec 1, 2014, at 07:10 PM, Madabhattula Rajesh Kumar wrote:
> Thank you Harsha for your response.
>
> I'm just printing the messages in printer bolt.
>
> Please find below printer blot code
>
> *public class PrintBolt extends BaseRichBolt * *{* * private static
> final long serialVersionUID = 1L;* * public void execute(Tuple tuple)
> * * {* * System.out.println("message " + tuple.getValues());* * }* *
> public void prepare(Map arg0, TopologyContext arg1, OutputCollector
> arg2) {* * }* * public void declareOutputFields(OutputFieldsDeclarer
> arg0) {* * }* *}*
>
> Regards, Rajesh
>
>
> On Tue, Dec 2, 2014 at 8:19 AM, Harsha <st...@harsha.io> wrote:
>> __
>> Does your printer bolt ack the messages it received from KafkaSpout.
>>
>>
>> On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote:
>>> Hello,
>>>
>>> Could any one help me on above mail query?
>>>
>>> Regards, Rajesh
>>>
>>> On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar
>>> <mr...@gmail.com> wrote:
>>>> Hello,
>>>>
>>>> I'm new to Storm and Kafka. I have tried Strom-Kafka integration
>>>> example program. Now I'm able to send message from Kafka and
>>>> receive those messages in storm topology.
>>>>
>>>> I have observed one thing in storm topology, same messages are
>>>> processing continuously
>>>>
>>>> *I have sent three messages (First Message, Second Message, Third
>>>> Message ). These 3 messages processing continuously, please find
>>>> below console log file*
>>>>
>>>> *Could you please help me on below query*
>>>>  * *How to make sure that storm topology process messages one time
>>>>    successfully(Not multiple times). *
>>>>  * *What configurations I need to do * *Below is my code :*
>>>>
>>>> * BrokerHosts zk = new ZkHosts("localhost:2181");* * SpoutConfig
>>>>   spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm",
>>>>   "discovery");* * spoutConf.scheme = new SchemeAsMultiScheme(new
>>>>   StringScheme());* * KafkaSpout spout = new
>>>>   KafkaSpout(spoutConf);* * TopologyBuilder builder = new
>>>>   TopologyBuilder();* * builder.setSpout("spout", spout, 1);* *
>>>>   builder.setBolt("printerbolt", new PrintBolt())
>>>>   .shuffleGrouping("spout");* * Config config = new Config();* *
>>>>   config.setDebug(true);* * LocalCluster cluster = new
>>>>   LocalCluster();* * cluster.submitTopology("kafka", config,
>>>>   builder.createTopology());*
>>>>
>>>> *Log file :*
>>>>
>>>> 25526 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting:
>>>> spout default [First Message] 25528 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {-5148901491748001310=-1334200518948214946}, [First Message]
>>>> *message [First Message]* 25538 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [-5148901491748001310 -1334200518948214946 3] 25539
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [-5148901491748001310 -1334200518948214946 3] 33530
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [Second Message] 33531 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {-8623931148894813393=4843611232629293066}, [Second Message]
>>>> *message [Second Message]* 33531 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [-8623931148894813393 4843611232629293066 3] 33532
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [-8623931148894813393 4843611232629293066 3] 38532
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [Thrid Message] 38536 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {-7749553958395790620=-1739211867328620785}, [Thrid Message]
>>>> *message [Thrid Message]* 38537 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [-7749553958395790620 -1739211867328620785 3] 38537
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [-7749553958395790620 -1739211867328620785 3] 46201
>>>> [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing
>>>> received message source: __system:-1, stream: __tick, id: {}, [30]
>>>> 76155 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream:
>>>> __metrics_tick, id: {}, [60] 76159 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.task - Emitting: printerbolt __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>>>> #<DataPoint [__receive = {write_pos=4, read_pos=3, capacity=1024,
>>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>>>> #<DataPoint [__execute-count = {}]>]] 76202 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: __system:-1, stream: __tick, id: {}, [30] 76206
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [First Message] 76206 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {956790162864404846=494501721511970112}, [First Message] *message
>>>> [First Message]* 76207 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [956790162864404846 494501721511970112 3] 76207 [Thread-14-__acker]
>>>> INFO backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: __ack_init, id: {}, [956790162864404846
>>>> 494501721511970112 3] 76207 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout default [Second
>>>> Message] 76208 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {-5947127688111528870=-2474569870953878080}, [Second Message]
>>>> *message [Second Message]* 76208 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [-5947127688111528870 -2474569870953878080 3] 76208
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [-5947127688111528870 -2474569870953878080 3] 76208
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [Thrid Message] 76209 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {4790700513589938438=-2542940781190231591}, [Thrid Message]
>>>> *message [Thrid Message]* 76209 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [4790700513589938438 -2542940781190231591 3] 76209
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [4790700513589938438 -2542940781190231591 3] 76276
>>>> [Thread-12-__system] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream:
>>>> __metrics_tick, id: {}, [60] 76277 [Thread-12-__system] INFO
>>>> backtype.storm.daemon.task - Emitting: __system __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap =
>>>> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232,
>>>> initBytes=64761920, virtualFreeBytes=979998072,
>>>> committedBytes=62783488}]> #<DataPoint [__receive = {write_pos=1,
>>>> read_pos=0, capacity=1024, population=1}]> #<DataPoint
>>>> [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
>>>> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]>
>>>> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>>>> #<DataPoint [memory/nonHeap = {unusedBytes=171552,
>>>> usedBytes=41312736, maxBytes=224395264, initBytes=24313856,
>>>> virtualFreeBytes=183082528, committedBytes=41484288}]> #<DataPoint
>>>> [uptimeSecs = 76.666]> #<DataPoint [__transfer = {write_pos=12,
>>>> read_pos=12, capacity=1024, population=0}]> #<DataPoint
>>>> [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency
>>>> = {}]> #<DataPoint [__transfer-count = {}]>]] 76364
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream:
>>>> __metrics_tick, id: {}, [60] 76365 [Thread-14-__acker] INFO
>>>> backtype.storm.daemon.task - Emitting: __acker __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>>>> #<DataPoint [__receive = {write_pos=7, read_pos=6, capacity=1024,
>>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>>>> [__transfer-count = {}]> #<DataPoint [__execute-latency =
>>>> {spout:__ack_init=0.0}]> #<DataPoint [__fail-count = {}]>
>>>> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count =
>>>> {spout:__ack_init=20}]>]] 76377 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: __system:-1, stream: __metrics_tick, id: {}, [60] 76381
>>>> [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data found in
>>>> Kafka Partition partition_1 76382 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __metrics [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=11, read_pos=11, capacity=1024, population=0}]>
>>>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive =
>>>> {write_pos=3, read_pos=2, capacity=1024, population=1}]>
>>>> #<DataPoint [kafkaPartition =
>>>> {Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPILatencyMax=19,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPICallCount=28451,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPICallCount=28452,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPIMessageCount=0,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPILatencyMean=0.05672711935892029,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPIMessageCount=6,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPILatencyMax=42,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
>>>> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
>>>> #<DataPoint [__emit-count = {}]>]] 76943 [Thread-10-spout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>> connections 76949 [Thread-10-spout] INFO
>>>> storm.kafka.DynamicBrokersReader - Read partition info from
>>>> zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
>>>> 1=rajesh-VirtualBox:9092}} 76949 [Thread-10-spout] INFO
>>>> storm.kafka.KafkaUtils - Task [1/1] assigned
>>>> [Partition{host=rajesh-VirtualBox:9092, partition=0},
>>>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 76949
>>>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>>>> Deleted partition managers: [] 76949 [Thread-10-spout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>> 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>>>> Finished refreshing 106203 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: __system:-1, stream: __tick, id: {}, [30] 136154
>>>> [Thread-8-printerbolt] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream:
>>>> __metrics_tick, id: {}, [60] 136155 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.task - Emitting: printerbolt __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>>>> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=1024,
>>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>>>> #<DataPoint [__execute-count = {}]>]] 136204 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: __system:-1, stream: __tick, id: {}, [30] 136206
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [First Message] 136206 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {3336041025082572443=4848943651836321291}, [First Message] *message
>>>> [First Message]* 136206 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [3336041025082572443 4848943651836321291 3] 136207
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [3336041025082572443 4848943651836321291 3] 136207
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [Second Message] 136208 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {8818700006514275130=7403177023020018790}, [Second Message]
>>>> *message [Second Message]* 136208 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [8818700006514275130 7403177023020018790 3] 136208
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [8818700006514275130 7403177023020018790 3] 136209
>>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout
>>>> default [Thrid Message] 136211 [Thread-8-printerbolt] INFO
>>>> backtype.storm.daemon.executor - Processing received message
>>>> source: spout:3, stream: default, id:
>>>> {7897209966477580404=-5223890645152565221}, [Thrid Message]
>>>> *message [Thrid Message]* 136211 [Thread-10-spout] INFO
>>>> backtype.storm.daemon.task - Emitting: spout __ack_init
>>>> [7897209966477580404 -5223890645152565221 3] 136211
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: spout:3, stream: __ack_init,
>>>> id: {}, [7897209966477580404 -5223890645152565221 3] 136276
>>>> [Thread-12-__system] INFO backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream:
>>>> __metrics_tick, id: {}, [60] 136277 [Thread-12-__system] INFO
>>>> backtype.storm.daemon.task - Emitting: __system __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26,
>>>> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120,
>>>> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920,
>>>> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint
>>>> [__receive = {write_pos=2, read_pos=1, capacity=1024,
>>>> population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>>>> [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]>
>>>> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count =
>>>> {}]> #<DataPoint [GC/MarkSweepCompact = {count=0, timeMs=0}]>
>>>> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
>>>> capacity=1024, population=0}]> #<DataPoint [memory/nonHeap =
>>>> {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264,
>>>> initBytes=24313856, virtualFreeBytes=182786216,
>>>> committedBytes=41680896}]> #<DataPoint [uptimeSecs = 136.665]>
>>>> #<DataPoint [__transfer = {write_pos=18, read_pos=18,
>>>> capacity=1024, population=0}]> #<DataPoint [startTimeSecs =
>>>> 1.417279417821E9]> #<DataPoint [__process-latency = {}]>
>>>> #<DataPoint [__transfer-count = {}]>]] 136364 [Thread-14-__acker]
>>>> INFO backtype.storm.daemon.executor - Processing received message
>>>> source: __system:-1, stream: __metrics_tick, id: {}, [60] 136364
>>>> [Thread-14-__acker] INFO backtype.storm.daemon.task - Emitting:
>>>> __acker __metrics [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>>>> #<DataPoint [__receive = {write_pos=11, read_pos=10, capacity=1024,
>>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>>>> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]] 136379
>>>> [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing
>>>> received message source: __system:-1, stream: __metrics_tick, id:
>>>> {}, [60] 136382 [Thread-10-spout] WARN storm.kafka.KafkaUtils - No
>>>> data found in Kafka Partition partition_1 136383 [Thread-10-spout]
>>>> INFO backtype.storm.daemon.task - Emitting: spout __metrics
>>>> [#<TaskInfo
>>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29>
>>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>>>> {write_pos=17, read_pos=17, capacity=1024, population=0}]>
>>>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive =
>>>> {write_pos=6, read_pos=5, capacity=1024, population=1}]>
>>>> #<DataPoint [kafkaPartition =
>>>> {Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPILatencyMax=15,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPICallCount=30606,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPICallCount=30606,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPIMessageCount=0,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPILatencyMean=0.022773312422400837,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPIMessageCount=3,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=1}/fetchAPILatencyMax=16,
>>>> Partition{host=rajesh-VirtualBox:9092,
>>>> partition=0}/fetchAPILatencyMean=0.026694112265568844}]>
>>>> #<DataPoint [__transfer-count = {default=20}]> #<DataPoint
>>>> [__fail-count = {default=0}]> #<DataPoint [__emit-count =
>>>> {default=20}]>]] 136950 [Thread-10-spout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>> connections 136954 [Thread-10-spout] INFO
>>>> storm.kafka.DynamicBrokersReader - Read partition info from
>>>> zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
>>>> 1=rajesh-VirtualBox:9092}} 136954 [Thread-10-spout] INFO
>>>> storm.kafka.KafkaUtils - Task [1/1] assigned
>>>> [Partition{host=rajesh-VirtualBox:9092, partition=0},
>>>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 136954
>>>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>>>> Deleted partition managers: [] 136954 [Thread-10-spout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>> 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task
>>>> [1/1] Finished refreshing
>>>>
>>>> Regards, Rajesh
>>>
>>
>


Re: Need help on kafka-storm

Posted by Madabhattula Rajesh Kumar <mr...@gmail.com>.
Thank you Harsha for your response.

I'm just printing the messages in printer bolt.

Please find below printer blot code












*public class PrintBolt extends BaseRichBolt   {    private static final
long serialVersionUID = 1L;    public void execute(Tuple tuple)
{         System.out.println("message " + tuple.getValues());    }   public
void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {   }
public void declareOutputFields(OutputFieldsDeclarer arg0) {  }}*

Regards,
Rajesh


On Tue, Dec 2, 2014 at 8:19 AM, Harsha <st...@harsha.io> wrote:

>  Does your printer bolt ack the messages it received from KafkaSpout.
>
>
> On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote:
>
> Hello,
>
> Could any one help me on above mail query?
>
> Regards,
> Rajesh
>
> On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar <
> mrajaforu@gmail.com> wrote:
>
> Hello,
>
> I'm new to Storm and Kafka. I have tried Strom-Kafka integration example
> program. Now I'm able to send message from Kafka and receive those messages
> in storm topology.
>
>  I have observed one thing in storm topology, same messages are
> processing continuously
>
>  *I have sent three messages (First Message, Second Message, Third
> Message ). These 3 messages processing continuously, please find below
> console log file*
>
> *Could you please help me on below query*
>
>    - *How to make sure that storm topology process messages one time
>    successfully(Not multiple times). *
>    - *What configurations I need to do *
>
> *Below is my code :*
>
>  * BrokerHosts zk = new ZkHosts("localhost:2181");*
> *        SpoutConfig spoutConf = new SpoutConfig(zk, "test-topic",
> "/kafkastorm", "discovery");*
> *        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());*
> *        KafkaSpout spout = new KafkaSpout(spoutConf);*
> *        TopologyBuilder builder = new TopologyBuilder();*
> *        builder.setSpout("spout", spout, 1);*
> *        builder.setBolt("printerbolt", new PrintBolt())
> .shuffleGrouping("spout");*
> *        Config config = new Config();*
> *        config.setDebug(true);*
> *        LocalCluster cluster = new LocalCluster();*
> *        cluster.submitTopology("kafka", config,
> builder.createTopology());*
>
>  *Log file :*
>
> 25526 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 25528 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5148901491748001310=-1334200518948214946}, [First Message]
> *message [First Message]*
> 25538 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5148901491748001310 -1334200518948214946 3]
> 25539 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5148901491748001310 -1334200518948214946 3]
> 33530 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 33531 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-8623931148894813393=4843611232629293066}, [Second Message]
> *message [Second Message]*
> 33531 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-8623931148894813393 4843611232629293066 3]
> 33532 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-8623931148894813393 4843611232629293066 3]
> 38532 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 38536 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-7749553958395790620=-1739211867328620785}, [Thrid Message]
> *message [Thrid Message]*
> 38537 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-7749553958395790620 -1739211867328620785 3]
> 38537 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-7749553958395790620 -1739211867328620785 3]
> 46201 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76159 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=4, read_pos=3, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 76202 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 76206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {956790162864404846=494501721511970112}, [First Message]
> *message [First Message]*
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [956790162864404846 494501721511970112 3]
> 76207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [956790162864404846 494501721511970112 3]
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 76208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5947127688111528870=-2474569870953878080}, [Second Message]
> *message [Second Message]*
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 76209 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {4790700513589938438=-2542940781190231591}, [Thrid Message]
> *message [Thrid Message]*
> 76209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [4790700513589938438 -2542940781190231591 3]
> 76209 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [4790700513589938438 -2542940781190231591 3]
> 76276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap =
> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232,
> initBytes=64761920, virtualFreeBytes=979998072, committedBytes=62783488}]>
> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024,
> population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint
> [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
> #<DataPoint [memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
> maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
> committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint
> [__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]>
> #<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
> 76364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76365 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=7, read_pos=6, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {spout:__ack_init=0.0}]> #<DataPoint [__fail-count =
> {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count =
> {spout:__ack_init=20}]>]]
> 76377 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 76381 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 76382 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11,
> read_pos=11, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=28451,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=28452,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.05672711935892029,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
> #<DataPoint [__emit-count = {}]>]]
> 76943 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 76949 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 76949 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
> [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
> 106203 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136154 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=8, read_pos=7, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 136204 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [First Message]
> 136206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {3336041025082572443=4848943651836321291}, [First Message]
> *message [First Message]*
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Second Message]
> 136208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {8818700006514275130=7403177023020018790}, [Second Message]
> *message [Second Message]*
> 136208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [8818700006514275130 7403177023020018790 3]
> 136208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [8818700006514275130 7403177023020018790 3]
> 136209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Thrid Message]
> 136211 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {7897209966477580404=-5223890645152565221}, [Thrid Message]
> *message [Thrid Message]*
> 136211 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [7897209966477580404 -5223890645152565221 3]
> 136211 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [7897209966477580404 -5223890645152565221 3]
> 136276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26,
> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120,
> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920,
> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint
> [__receive = {write_pos=2, read_pos=1, capacity=1024, population=1}]>
> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]>
> #<DataPoint [__execute-count = {}]> #<DataPoint [GC/MarkSweepCompact =
> {count=0, timeMs=0}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap =
> {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264,
> initBytes=24313856, virtualFreeBytes=182786216, committedBytes=41680896}]>
> #<DataPoint [uptimeSecs = 136.665]> #<DataPoint [__transfer =
> {write_pos=18, read_pos=18, capacity=1024, population=0}]> #<DataPoint
> [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency = {}]>
> #<DataPoint [__transfer-count = {}]>]]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]]
> 136379 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 136382 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 136383 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17,
> read_pos=17, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.022773312422400837,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
> [__transfer-count = {default=20}]> #<DataPoint [__fail-count =
> {default=0}]> #<DataPoint [__emit-count = {default=20}]>]]
> 136950 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 136954 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 136954 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1]
> assigned [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
>
> Regards,
> Rajesh
>
>
>
>
>

Re: Need help on kafka-storm

Posted by Harsha <st...@harsha.io>.
Does your printer bolt ack the messages it received from KafkaSpout.


On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote:
> Hello,
>
> Could any one help me on above mail query?
>
> Regards, Rajesh
>
> On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar
> <mr...@gmail.com> wrote:
>> Hello,
>>
>> I'm new to Storm and Kafka. I have tried Strom-Kafka integration
>> example program. Now I'm able to send message from Kafka and receive
>> those messages in storm topology.
>>
>> I have observed one thing in storm topology, same messages are
>> processing continuously
>>
>> *I have sent three messages (First Message, Second Message, Third
>> Message ). These 3 messages processing continuously, please find
>> below console log file*
>>
>> *Could you please help me on below query*
>>  * *How to make sure that storm topology process messages one time
>>    successfully(Not multiple times). *
>>  * *What configurations I need to do * *Below is my code :*
>>
>> * BrokerHosts zk = new ZkHosts("localhost:2181");* * SpoutConfig
>>   spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm",
>>   "discovery");* * spoutConf.scheme = new SchemeAsMultiScheme(new
>>   StringScheme());* * KafkaSpout spout = new KafkaSpout(spoutConf);*
>>   * TopologyBuilder builder = new TopologyBuilder();* *
>>   builder.setSpout("spout", spout, 1);* *
>>   builder.setBolt("printerbolt", new PrintBolt())
>>   .shuffleGrouping("spout");* * Config config = new Config();* *
>>   config.setDebug(true);* * LocalCluster cluster = new
>>   LocalCluster();* * cluster.submitTopology("kafka", config,
>>   builder.createTopology());*
>>
>> *Log file :*
>>
>> 25526 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting:
>> spout default [First Message] 25528 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {-5148901491748001310=-1334200518948214946}, [First Message] *message
>> [First Message]* 25538 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [-5148901491748001310 -1334200518948214946 3] 25539
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [-5148901491748001310 -1334200518948214946 3] 33530 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [Second
>> Message] 33531 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {-8623931148894813393=4843611232629293066}, [Second Message] *message
>> [Second Message]* 33531 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [-8623931148894813393 4843611232629293066 3] 33532
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [-8623931148894813393 4843611232629293066 3] 38532 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [Thrid
>> Message] 38536 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {-7749553958395790620=-1739211867328620785}, [Thrid Message] *message
>> [Thrid Message]* 38537 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [-7749553958395790620 -1739211867328620785 3] 38537
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [-7749553958395790620 -1739211867328620785 3] 46201 [Thread-10-spout]
>> INFO backtype.storm.daemon.executor - Processing received message
>> source: __system:-1, stream: __tick, id: {}, [30] 76155
>> [Thread-8-printerbolt] INFO backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream:
>> __metrics_tick, id: {}, [60] 76159 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.task - Emitting: printerbolt __metrics
>> [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>> #<DataPoint [__receive = {write_pos=4, read_pos=3, capacity=1024,
>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>> #<DataPoint [__execute-count = {}]>]] 76202 [Thread-10-spout] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> __system:-1, stream: __tick, id: {}, [30] 76206 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [First
>> Message] 76206 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {956790162864404846=494501721511970112}, [First Message] *message
>> [First Message]* 76207 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [956790162864404846 494501721511970112 3] 76207 [Thread-14-__acker]
>> INFO backtype.storm.daemon.executor - Processing received message
>> source: spout:3, stream: __ack_init, id: {}, [956790162864404846
>> 494501721511970112 3] 76207 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout default [Second Message]
>> 76208 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor -
>> Processing received message source: spout:3, stream: default, id:
>> {-5947127688111528870=-2474569870953878080}, [Second Message]
>> *message [Second Message]* 76208 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [-5947127688111528870 -2474569870953878080 3] 76208
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [-5947127688111528870 -2474569870953878080 3] 76208 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [Thrid
>> Message] 76209 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {4790700513589938438=-2542940781190231591}, [Thrid Message] *message
>> [Thrid Message]* 76209 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [4790700513589938438 -2542940781190231591 3] 76209
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [4790700513589938438 -2542940781190231591 3] 76276
>> [Thread-12-__system] INFO backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {},
>> [60] 76277 [Thread-12-__system] INFO backtype.storm.daemon.task -
>> Emitting: __system __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap =
>> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232,
>> initBytes=64761920, virtualFreeBytes=979998072,
>> committedBytes=62783488}]> #<DataPoint [__receive = {write_pos=1,
>> read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count
>> = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint
>> [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint
>> [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
>> read_pos=-1, capacity=1024, population=0}]> #<DataPoint
>> [memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
>> maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
>> committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]>
>> #<DataPoint [__transfer = {write_pos=12, read_pos=12, capacity=1024,
>> population=0}]> #<DataPoint [startTimeSecs = 1.417279417821E9]>
>> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count =
>> {}]>]] 76364 [Thread-14-__acker] INFO backtype.storm.daemon.executor
>> - Processing received message source: __system:-1, stream:
>> __metrics_tick, id: {}, [60] 76365 [Thread-14-__acker] INFO
>> backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>> #<DataPoint [__receive = {write_pos=7, read_pos=6, capacity=1024,
>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__execute-latency =
>> {spout:__ack_init=0.0}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>> [__emit-count = {}]> #<DataPoint [__execute-count =
>> {spout:__ack_init=20}]>]] 76377 [Thread-10-spout] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> __system:-1, stream: __metrics_tick, id: {}, [60] 76381
>> [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data found in
>> Kafka Partition partition_1 76382 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=11, read_pos=11, capacity=1024, population=0}]>
>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive =
>> {write_pos=3, read_pos=2, capacity=1024, population=1}]> #<DataPoint
>> [kafkaPartition = {Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPILatencyMax=19,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPICallCount=28451,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPICallCount=28452,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPIMessageCount=0,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPILatencyMean=0.05672711935892029,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPIMessageCount=6,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPILatencyMax=42,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
>> #<DataPoint [__emit-count = {}]>]] 76943 [Thread-10-spout] INFO
>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>> connections 76949 [Thread-10-spout] INFO
>> storm.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper:
>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
>> 1=rajesh-VirtualBox:9092}} 76949 [Thread-10-spout] INFO
>> storm.kafka.KafkaUtils - Task [1/1] assigned
>> [Partition{host=rajesh-VirtualBox:9092, partition=0},
>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 76949
>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted
>> partition managers: [] 76949 [Thread-10-spout] INFO
>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>> 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>> Finished refreshing 106203 [Thread-10-spout] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> __system:-1, stream: __tick, id: {}, [30] 136154
>> [Thread-8-printerbolt] INFO backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream:
>> __metrics_tick, id: {}, [60] 136155 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.task - Emitting: printerbolt __metrics
>> [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=1024,
>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>> #<DataPoint [__execute-count = {}]>]] 136204 [Thread-10-spout] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> __system:-1, stream: __tick, id: {}, [30] 136206 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [First
>> Message] 136206 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {3336041025082572443=4848943651836321291}, [First Message] *message
>> [First Message]* 136206 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [3336041025082572443 4848943651836321291 3] 136207
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [3336041025082572443 4848943651836321291 3] 136207 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [Second
>> Message] 136208 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {8818700006514275130=7403177023020018790}, [Second Message] *message
>> [Second Message]* 136208 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [8818700006514275130 7403177023020018790 3] 136208
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [8818700006514275130 7403177023020018790 3] 136209 [Thread-10-spout]
>> INFO backtype.storm.daemon.task - Emitting: spout default [Thrid
>> Message] 136211 [Thread-8-printerbolt] INFO
>> backtype.storm.daemon.executor - Processing received message source:
>> spout:3, stream: default, id:
>> {7897209966477580404=-5223890645152565221}, [Thrid Message] *message
>> [Thrid Message]* 136211 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __ack_init
>> [7897209966477580404 -5223890645152565221 3] 136211
>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - Processing
>> received message source: spout:3, stream: __ack_init, id: {},
>> [7897209966477580404 -5223890645152565221 3] 136276
>> [Thread-12-__system] INFO backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {},
>> [60] 136277 [Thread-12-__system] INFO backtype.storm.daemon.task -
>> Emitting: __system __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26,
>> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120,
>> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920,
>> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint
>> [__receive = {write_pos=2, read_pos=1, capacity=1024, population=1}]>
>> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency =
>> {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count =
>> {}]> #<DataPoint [__execute-count = {}]> #<DataPoint
>> [GC/MarkSweepCompact = {count=0, timeMs=0}]> #<DataPoint [__sendqueue
>> = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>> #<DataPoint [memory/nonHeap = {unusedBytes=71848, usedBytes=41609048,
>> maxBytes=224395264, initBytes=24313856, virtualFreeBytes=182786216,
>> committedBytes=41680896}]> #<DataPoint [uptimeSecs = 136.665]>
>> #<DataPoint [__transfer = {write_pos=18, read_pos=18, capacity=1024,
>> population=0}]> #<DataPoint [startTimeSecs = 1.417279417821E9]>
>> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count =
>> {}]>]] 136364 [Thread-14-__acker] INFO backtype.storm.daemon.executor
>> - Processing received message source: __system:-1, stream:
>> __metrics_tick, id: {}, [60] 136364 [Thread-14-__acker] INFO
>> backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
>> #<DataPoint [__receive = {write_pos=11, read_pos=10, capacity=1024,
>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]>
>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
>> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]] 136379
>> [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {},
>> [60] 136382 [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data
>> found in Kafka Partition partition_1 136383 [Thread-10-spout] INFO
>> backtype.storm.daemon.task - Emitting: spout __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue =
>> {write_pos=17, read_pos=17, capacity=1024, population=0}]>
>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive =
>> {write_pos=6, read_pos=5, capacity=1024, population=1}]> #<DataPoint
>> [kafkaPartition = {Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPILatencyMax=15,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPICallCount=30606,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPICallCount=30606,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPIMessageCount=0,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPILatencyMean=0.022773312422400837,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPIMessageCount=3,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=1}/fetchAPILatencyMax=16,
>> Partition{host=rajesh-VirtualBox:9092,
>> partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
>> [__transfer-count = {default=20}]> #<DataPoint [__fail-count =
>> {default=0}]> #<DataPoint [__emit-count = {default=20}]>]] 136950
>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>> Refreshing partition manager connections 136954 [Thread-10-spout]
>> INFO storm.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper:
>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
>> 1=rajesh-VirtualBox:9092}} 136954 [Thread-10-spout] INFO
>> storm.kafka.KafkaUtils - Task [1/1] assigned
>> [Partition{host=rajesh-VirtualBox:9092, partition=0},
>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 136954
>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted
>> partition managers: [] 136954 [Thread-10-spout] INFO
>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>> 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1]
>> Finished refreshing
>>
>> Regards, Rajesh
>


Re: Need help on kafka-storm

Posted by Madabhattula Rajesh Kumar <mr...@gmail.com>.
Hello,

Could any one help me on above mail query?

Regards,
Rajesh

On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar <
mrajaforu@gmail.com> wrote:

> Hello,
>
> I'm new to Storm and Kafka. I have tried Strom-Kafka integration example
> program. Now I'm able to send message from Kafka and receive those messages
> in storm topology.
>
> I have observed one thing in storm topology, same messages are processing
> continuously
>
> *I have sent three messages (First Message, Second Message, Third Message
> ). These 3 messages processing continuously, please find below console log
> file*
>
>
> *Could you please help me on below query*
>
>    -
> *How to make sure that storm topology process messages one time
>    successfully(Not multiple times). *
>    - *What configurations I need to do *
>
> *Below is my code :*
>
>
>
>
>
>
>
>
>
>
>
> * BrokerHosts zk = new ZkHosts("localhost:2181");        SpoutConfig
> spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm",
> "discovery");        spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());        KafkaSpout spout = new KafkaSpout(spoutConf);
>     TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("spout", spout, 1);        builder.setBolt("printerbolt",
> new PrintBolt()) .shuffleGrouping("spout");        Config config = new
> Config();        config.setDebug(true);        LocalCluster cluster = new
> LocalCluster();        cluster.submitTopology("kafka", config,
> builder.createTopology());*
>
> *Log file :*
>
> 25526 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 25528 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5148901491748001310=-1334200518948214946}, [First Message]
> *message [First Message]*
> 25538 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5148901491748001310 -1334200518948214946 3]
> 25539 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5148901491748001310 -1334200518948214946 3]
> 33530 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 33531 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-8623931148894813393=4843611232629293066}, [Second Message]
> *message [Second Message]*
> 33531 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-8623931148894813393 4843611232629293066 3]
> 33532 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-8623931148894813393 4843611232629293066 3]
> 38532 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 38536 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-7749553958395790620=-1739211867328620785}, [Thrid Message]
> *message [Thrid Message]*
> 38537 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-7749553958395790620 -1739211867328620785 3]
> 38537 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-7749553958395790620 -1739211867328620785 3]
> 46201 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76159 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=4, read_pos=3, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 76202 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 76206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [First Message]
> 76206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {956790162864404846=494501721511970112}, [First Message]
> *message [First Message]*
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [956790162864404846 494501721511970112 3]
> 76207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [956790162864404846 494501721511970112 3]
> 76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Second Message]
> 76208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {-5947127688111528870=-2474569870953878080}, [Second Message]
> *message [Second Message]*
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [-5947127688111528870 -2474569870953878080 3]
> 76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> default [Thrid Message]
> 76209 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {4790700513589938438=-2542940781190231591}, [Thrid Message]
> *message [Thrid Message]*
> 76209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __ack_init [4790700513589938438 -2542940781190231591 3]
> 76209 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [4790700513589938438 -2542940781190231591 3]
> 76276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap =
> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232,
> initBytes=64761920, virtualFreeBytes=979998072, committedBytes=62783488}]>
> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024,
> population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint
> [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
> #<DataPoint [memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
> maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
> committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint
> [__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]>
> #<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
> 76364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 76365 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=7, read_pos=6, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {spout:__ack_init=0.0}]> #<DataPoint [__fail-count =
> {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count =
> {spout:__ack_init=20}]>]]
> 76377 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 76381 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 76382 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11,
> read_pos=11, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=28451,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=28452,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.05672711935892029,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
> #<DataPoint [__emit-count = {}]>]]
> 76943 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 76949 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 76949 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
> [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
> 106203 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136154 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
> printerbolt __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=8, read_pos=7, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
> 136204 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [First Message]
> 136206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {3336041025082572443=4848943651836321291}, [First Message]
> *message [First Message]*
> 136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [3336041025082572443 4848943651836321291 3]
> 136207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Second Message]
> 136208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {8818700006514275130=7403177023020018790}, [Second Message]
> *message [Second Message]*
> 136208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [8818700006514275130 7403177023020018790 3]
> 136208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [8818700006514275130 7403177023020018790 3]
> 136209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout default [Thrid Message]
> 136211 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: default, id:
> {7897209966477580404=-5223890645152565221}, [Thrid Message]
> *message [Thrid Message]*
> 136211 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __ack_init [7897209966477580404 -5223890645152565221 3]
> 136211 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: spout:3, stream: __ack_init, id: {},
> [7897209966477580404 -5223890645152565221 3]
> 136276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
> __system __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26,
> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120,
> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920,
> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint
> [__receive = {write_pos=2, read_pos=1, capacity=1024, population=1}]>
> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]>
> #<DataPoint [__execute-count = {}]> #<DataPoint [GC/MarkSweepCompact =
> {count=0, timeMs=0}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap =
> {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264,
> initBytes=24313856, virtualFreeBytes=182786216, committedBytes=41680896}]>
> #<DataPoint [uptimeSecs = 136.665]> #<DataPoint [__transfer =
> {write_pos=18, read_pos=18, capacity=1024, population=0}]> #<DataPoint
> [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency = {}]>
> #<DataPoint [__transfer-count = {}]>]]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __metrics_tick,
> id: {}, [60]
> 136364 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
> __acker __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive =
> {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint
> [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint
> [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]]
> 136379 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 136382 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
> Kafka Partition partition_1
> 136383 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting:
> spout __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29>
> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17,
> read_pos=17, capacity=1024, population=0}]> #<DataPoint [__complete-latency
> = {}]> #<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024,
> population=1}]> #<DataPoint [kafkaPartition =
> {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPICallCount=30606,
> Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
> Partition{host=rajesh-VirtualBox:9092,
> partition=1}/fetchAPILatencyMean=0.022773312422400837,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3,
> Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16,
> Partition{host=rajesh-VirtualBox:9092,
> partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
> [__transfer-count = {default=20}]> #<DataPoint [__fail-count =
> {default=0}]> #<DataPoint [__emit-count = {default=20}]>]]
> 136950 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 136954 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
> 1=rajesh-VirtualBox:9092}}
> 136954 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1]
> assigned [Partition{host=rajesh-VirtualBox:9092, partition=0},
> Partition{host=rajesh-VirtualBox:9092, partition=1}]
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: []
> 136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
>
> Regards,
> Rajesh
>