You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Vineet Mishra <cl...@gmail.com> on 2015/02/23 11:15:22 UTC

Storm Field Grouping with specific fields

Hi All,

I am having a topology with Kafka Spout Implementation with the
topologyBuilder mentioned below,

        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
        builder.setBolt("Parser", new
ParserBolt()).globalGrouping("KafkaSpout");
        builder.setBolt("FileBolt", new
PersistBolt()).globalGrouping("Parser");

        Config config=new Config();
        config.put(Config.TOPOLOGY_WORKERS, 4);
        config.setNumWorkers(2);
        config.setMaxSpoutPending(10);
        config.setMaxTaskParallelism(10);

I am having two level of Bolts,

1) Parser - Parsing of data and emitting a output tuple value which is
containing POJO serialized object
2) Persist - Persisting of the forwarded data after some computation, which
is received through previous bolt(Parser).

Now I was looking out a way for the last PersistBolt("FileBolt") I want the
field grouping on the parser bolt based on the some field value(POJO) which
is being emitted.


To make it more clear,

Parser is emitting a POJO of the form,

collector.emit(new Values(responseHandler));

where responseHandler is a POJO,

public class ResponseHandler implements Serializable{

private String host = null;
private String user = null;
private String msg = null;
 public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getUser() {
return hostName;
}
public void setuser(String user) {
this.user = user;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
 }

Now I was looking out for a way to field group on the host and user level.

Actively looking for the way around!

Thanks!

Re: Storm Field Grouping with specific fields

Posted by Vineet Mishra <cl...@gmail.com>.
Well I got it. . .its just simply not possible to distribute the data
equally among all the working threads as it will compute key based on hash
and since I don't have many keys they tend to end up into same worker.

I guess that should be fine!

Thanks anyways!

On Mon, Feb 23, 2015 at 7:15 PM, Vineet Mishra <cl...@gmail.com>
wrote:

> Hi Nathan,
>
> Thanks for your revert but eventually what I am following is the same
> approach you have mentioned but still couldn't get the benefit of
> parallelism, so just to brief I have 3 node cluster setup with 1 supervisor
> and 2 workers.
>
> After running the instance, I could see that multiple tasks are running in
> the same node.
>
>   *Id* *Uptime* *Host* *Port* *Emitted* *Transferred* *Capacity (last
> 10m)* *Execute latency (ms)* *Executed* *Process latency (ms)* *Acked*
> *Failed*  *[5-5]}* *1m 56s* *ip-20-0-0-75* *6703*
> <http://ip-20-0-0-75:8000/log?file=worker-6703.log> *0* *0* *0.296* *0.02*
> *1723420* *0.02* *1723420* *0*  [4-4]} 30m 4s ip-20-0-0-78 6703
> <http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0 0.029 9700 0.043
> 9720 0  [3-3]} 1m 56s ip-20-0-0-75 6703
> <http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.001 0.025 2380
> 0.017 2360 0  [2-2]} 30m 4s ip-20-0-0-78 6703
> <http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0.004 0.025 400680
> 0.024 400680 0  [1-1]} 1m 56s ip-20-0-0-75 6703
> <http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.019 0.023 96480
> 0.02 96500 0
>
> Which seems to the reason behind why their is lag in running the topology.
> I was looking  out a way to curb this gap!
>
> Thanks!
>
>
> On Mon, Feb 23, 2015 at 6:30 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> You can put user and host in separate tuple fields and do fields grouping
>> on those fields.
>> On Feb 23, 2015 6:18 AM, "Vineet Mishra" <cl...@gmail.com> wrote:
>>
>>> I tried looking for a solution and could find this, CustomStreamGrouping
>>>
>>> I guess this should help me out, but I am getting an exception while
>>> implementing this.
>>>
>>> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> at
>>> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.IndexOutOfBoundsException at
>>> clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
>>> clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
>>> clojure.lang.APersistentVector.get(APersistentVector.java:171) at
>>> com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
>>> at
>>> backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
>>> at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
>>> backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
>>> at
>>> backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
>>> at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
>>> backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
>>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
>>> at
>>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
>>> at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
>>> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
>>> at
>>> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
>>> at
>>> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
>>> at
>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>> ... 6 more
>>>
>>> Let me know who has even faced the same issue.
>>>
>>> On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <cl...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am having a topology with Kafka Spout Implementation with the
>>>> topologyBuilder mentioned below,
>>>>
>>>>         TopologyBuilder builder=new TopologyBuilder();
>>>>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>>>>         builder.setBolt("Parser", new
>>>> ParserBolt()).globalGrouping("KafkaSpout");
>>>>         builder.setBolt("FileBolt", new
>>>> PersistBolt()).globalGrouping("Parser");
>>>>
>>>>         Config config=new Config();
>>>>         config.put(Config.TOPOLOGY_WORKERS, 4);
>>>>         config.setNumWorkers(2);
>>>>         config.setMaxSpoutPending(10);
>>>>         config.setMaxTaskParallelism(10);
>>>>
>>>> I am having two level of Bolts,
>>>>
>>>> 1) Parser - Parsing of data and emitting a output tuple value which is
>>>> containing POJO serialized object
>>>> 2) Persist - Persisting of the forwarded data after some computation,
>>>> which is received through previous bolt(Parser).
>>>>
>>>> Now I was looking out a way for the last PersistBolt("FileBolt") I want
>>>> the field grouping on the parser bolt based on the some field value(POJO)
>>>> which is being emitted.
>>>>
>>>>
>>>> To make it more clear,
>>>>
>>>> Parser is emitting a POJO of the form,
>>>>
>>>> collector.emit(new Values(responseHandler));
>>>>
>>>> where responseHandler is a POJO,
>>>>
>>>> public class ResponseHandler implements Serializable{
>>>>
>>>> private String host = null;
>>>> private String user = null;
>>>> private String msg = null;
>>>>  public String getHost() {
>>>> return host;
>>>> }
>>>> public void setHost(String host) {
>>>> this.host = host;
>>>> }
>>>> public String getUser() {
>>>> return hostName;
>>>> }
>>>> public void setuser(String user) {
>>>> this.user = user;
>>>> }
>>>> public String getMsg() {
>>>> return msg;
>>>> }
>>>> public void setMsg(String msg) {
>>>> this.msg = msg;
>>>> }
>>>>  }
>>>>
>>>> Now I was looking out for a way to field group on the host and user
>>>> level.
>>>>
>>>> Actively looking for the way around!
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>

Re: Storm Field Grouping with specific fields

Posted by Vineet Mishra <cl...@gmail.com>.
Hi Nathan,

Thanks for your revert but eventually what I am following is the same
approach you have mentioned but still couldn't get the benefit of
parallelism, so just to brief I have 3 node cluster setup with 1 supervisor
and 2 workers.

After running the instance, I could see that multiple tasks are running in
the same node.

  *Id* *Uptime* *Host* *Port* *Emitted* *Transferred* *Capacity (last
10m)* *Execute
latency (ms)* *Executed* *Process latency (ms)* *Acked* *Failed*  *[5-5]}* *1m
56s* *ip-20-0-0-75* *6703*
<http://ip-20-0-0-75:8000/log?file=worker-6703.log> *0* *0* *0.296* *0.02*
*1723420* *0.02* *1723420* *0*  [4-4]} 30m 4s ip-20-0-0-78 6703
<http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0 0.029 9700 0.043
9720 0  [3-3]} 1m 56s ip-20-0-0-75 6703
<http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.001 0.025 2380
0.017 2360 0  [2-2]} 30m 4s ip-20-0-0-78 6703
<http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0.004 0.025 400680
0.024 400680 0  [1-1]} 1m 56s ip-20-0-0-75 6703
<http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.019 0.023 96480
0.02 96500 0

Which seems to the reason behind why their is lag in running the topology.
I was looking  out a way to curb this gap!

Thanks!


On Mon, Feb 23, 2015 at 6:30 PM, Nathan Leung <nc...@gmail.com> wrote:

> You can put user and host in separate tuple fields and do fields grouping
> on those fields.
> On Feb 23, 2015 6:18 AM, "Vineet Mishra" <cl...@gmail.com> wrote:
>
>> I tried looking for a solution and could find this, CustomStreamGrouping
>>
>> I guess this should help me out, but I am getting an exception while
>> implementing this.
>>
>> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> at
>> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.IndexOutOfBoundsException at
>> clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
>> clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
>> clojure.lang.APersistentVector.get(APersistentVector.java:171) at
>> com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
>> at
>> backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
>> at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
>> backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
>> at
>> backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
>> at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
>> backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
>> at
>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
>> at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
>> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
>> at
>> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
>> at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
>> at
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>> ... 6 more
>>
>> Let me know who has even faced the same issue.
>>
>> On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <cl...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I am having a topology with Kafka Spout Implementation with the
>>> topologyBuilder mentioned below,
>>>
>>>         TopologyBuilder builder=new TopologyBuilder();
>>>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>>>         builder.setBolt("Parser", new
>>> ParserBolt()).globalGrouping("KafkaSpout");
>>>         builder.setBolt("FileBolt", new
>>> PersistBolt()).globalGrouping("Parser");
>>>
>>>         Config config=new Config();
>>>         config.put(Config.TOPOLOGY_WORKERS, 4);
>>>         config.setNumWorkers(2);
>>>         config.setMaxSpoutPending(10);
>>>         config.setMaxTaskParallelism(10);
>>>
>>> I am having two level of Bolts,
>>>
>>> 1) Parser - Parsing of data and emitting a output tuple value which is
>>> containing POJO serialized object
>>> 2) Persist - Persisting of the forwarded data after some computation,
>>> which is received through previous bolt(Parser).
>>>
>>> Now I was looking out a way for the last PersistBolt("FileBolt") I want
>>> the field grouping on the parser bolt based on the some field value(POJO)
>>> which is being emitted.
>>>
>>>
>>> To make it more clear,
>>>
>>> Parser is emitting a POJO of the form,
>>>
>>> collector.emit(new Values(responseHandler));
>>>
>>> where responseHandler is a POJO,
>>>
>>> public class ResponseHandler implements Serializable{
>>>
>>> private String host = null;
>>> private String user = null;
>>> private String msg = null;
>>>  public String getHost() {
>>> return host;
>>> }
>>> public void setHost(String host) {
>>> this.host = host;
>>> }
>>> public String getUser() {
>>> return hostName;
>>> }
>>> public void setuser(String user) {
>>> this.user = user;
>>> }
>>> public String getMsg() {
>>> return msg;
>>> }
>>> public void setMsg(String msg) {
>>> this.msg = msg;
>>> }
>>>  }
>>>
>>> Now I was looking out for a way to field group on the host and user
>>> level.
>>>
>>> Actively looking for the way around!
>>>
>>> Thanks!
>>>
>>
>>

Re: Storm Field Grouping with specific fields

Posted by Nathan Leung <nc...@gmail.com>.
You can put user and host in separate tuple fields and do fields grouping
on those fields.
On Feb 23, 2015 6:18 AM, "Vineet Mishra" <cl...@gmail.com> wrote:

> I tried looking for a solution and could find this, CustomStreamGrouping
>
> I guess this should help me out, but I am getting an exception while
> implementing this.
>
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException at
> clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
> clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
> clojure.lang.APersistentVector.get(APersistentVector.java:171) at
> com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
> at
> backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
> at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
> backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
> at
> backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
> at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
> backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
> at
> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
> at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
> at
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ... 6 more
>
> Let me know who has even faced the same issue.
>
> On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <cl...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I am having a topology with Kafka Spout Implementation with the
>> topologyBuilder mentioned below,
>>
>>         TopologyBuilder builder=new TopologyBuilder();
>>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>>         builder.setBolt("Parser", new
>> ParserBolt()).globalGrouping("KafkaSpout");
>>         builder.setBolt("FileBolt", new
>> PersistBolt()).globalGrouping("Parser");
>>
>>         Config config=new Config();
>>         config.put(Config.TOPOLOGY_WORKERS, 4);
>>         config.setNumWorkers(2);
>>         config.setMaxSpoutPending(10);
>>         config.setMaxTaskParallelism(10);
>>
>> I am having two level of Bolts,
>>
>> 1) Parser - Parsing of data and emitting a output tuple value which is
>> containing POJO serialized object
>> 2) Persist - Persisting of the forwarded data after some computation,
>> which is received through previous bolt(Parser).
>>
>> Now I was looking out a way for the last PersistBolt("FileBolt") I want
>> the field grouping on the parser bolt based on the some field value(POJO)
>> which is being emitted.
>>
>>
>> To make it more clear,
>>
>> Parser is emitting a POJO of the form,
>>
>> collector.emit(new Values(responseHandler));
>>
>> where responseHandler is a POJO,
>>
>> public class ResponseHandler implements Serializable{
>>
>> private String host = null;
>> private String user = null;
>> private String msg = null;
>>  public String getHost() {
>> return host;
>> }
>> public void setHost(String host) {
>> this.host = host;
>> }
>> public String getUser() {
>> return hostName;
>> }
>> public void setuser(String user) {
>> this.user = user;
>> }
>> public String getMsg() {
>> return msg;
>> }
>> public void setMsg(String msg) {
>> this.msg = msg;
>> }
>>  }
>>
>> Now I was looking out for a way to field group on the host and user level.
>>
>> Actively looking for the way around!
>>
>> Thanks!
>>
>
>

Re: Storm Field Grouping with specific fields

Posted by Vineet Mishra <cl...@gmail.com>.
I tried looking for a solution and could find this, CustomStreamGrouping

I guess this should help me out, but I am getting an exception while
implementing this.

java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException at
clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
clojure.lang.APersistentVector.get(APersistentVector.java:171) at
com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
at
backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
at
backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
at
backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
... 6 more

Let me know who has even faced the same issue.

On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <cl...@gmail.com>
wrote:

> Hi All,
>
> I am having a topology with Kafka Spout Implementation with the
> topologyBuilder mentioned below,
>
>         TopologyBuilder builder=new TopologyBuilder();
>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>         builder.setBolt("Parser", new
> ParserBolt()).globalGrouping("KafkaSpout");
>         builder.setBolt("FileBolt", new
> PersistBolt()).globalGrouping("Parser");
>
>         Config config=new Config();
>         config.put(Config.TOPOLOGY_WORKERS, 4);
>         config.setNumWorkers(2);
>         config.setMaxSpoutPending(10);
>         config.setMaxTaskParallelism(10);
>
> I am having two level of Bolts,
>
> 1) Parser - Parsing of data and emitting a output tuple value which is
> containing POJO serialized object
> 2) Persist - Persisting of the forwarded data after some computation,
> which is received through previous bolt(Parser).
>
> Now I was looking out a way for the last PersistBolt("FileBolt") I want
> the field grouping on the parser bolt based on the some field value(POJO)
> which is being emitted.
>
>
> To make it more clear,
>
> Parser is emitting a POJO of the form,
>
> collector.emit(new Values(responseHandler));
>
> where responseHandler is a POJO,
>
> public class ResponseHandler implements Serializable{
>
> private String host = null;
> private String user = null;
> private String msg = null;
>  public String getHost() {
> return host;
> }
> public void setHost(String host) {
> this.host = host;
> }
> public String getUser() {
> return hostName;
> }
> public void setuser(String user) {
> this.user = user;
> }
> public String getMsg() {
> return msg;
> }
> public void setMsg(String msg) {
> this.msg = msg;
> }
>  }
>
> Now I was looking out for a way to field group on the host and user level.
>
> Actively looking for the way around!
>
> Thanks!
>