You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Noppanit Charassinvichai <no...@gmail.com> on 2017/10/08 16:48:28 UTC

How to send a tuple to different bolt according to a value in the message

Hi,

I have a Storm cluster connecting to Kinesis Stream. The message looks like
this.

{
_c: "a"
}

or it should be

{
_c: "b"
}

I would like to send a tuple with _c="a" to one bolt and _c="b" to a
different bolt. How do I achieve this?

This is the bolt that parsing the message from Kinesis to JSON Object using
GSon

    @Override
    public void execute(Tuple tuple) {
      String partitionKey = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
      String sequenceNumber = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
      byte[] payload = (byte[])
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);

      ByteBuffer buffer = ByteBuffer.wrap(payload);
      String data = null;
      try {
        data = decoder.decode(buffer).toString();

        HashMap < String, String > map = new Gson().fromJson(data, new
TypeToken < HashMap < String, Object >> () {}.getType());

        this.outputCollector.emit(tuple, new Values(map));
        this.outputCollector.ack(tuple);

      } catch (CharacterCodingException e) {
        this.outputCollector.fail(tuple);
      }

    }

Thanks

Re: How to send a tuple to different bolt according to a value in the message

Posted by Noppanit Charassinvichai <no...@gmail.com>.
Thanks. I'll try that.

On Sun, 8 Oct 2017 at 13:40 Stig Rohde Døssing <sr...@apache.org> wrote:

> Yes, though you might want to use collector.emit(streamid, tuple, value)
> instead of collector.emit(streamid, value) so Storm keeps tracking whether
> the tuple is acked downstream.
>
> Also you might consider extending
> https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html
> instead of BaseRichBolt for bolts that just receive a tuple, do some
> processing and immediately emit one or more new tuples. BaseBasicBolt takes
> care of acking and anchoring for you, so it can make your code a little
> more simple.
>
> 2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <no...@gmail.com>
> :
>
>> I do have a follow-up question after the question on Stackoverflow.
>> Do I need a condition in my bolt for two different streams?
>>
>> For example
>>
>> if (tople.get("_c").equals("a")) {
>>   collector.emit("stream1", new Values("field1Value"));
>> } else {
>>   collector.emit("stream12", new Values("field1Value"));
>> }
>>
>>
>> On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <
>> noppanit.c@gmail.com> wrote:
>>
>>> Thanks for the reply.
>>>
>>> Yes that's what I wanted to achieve.
>>>
>>> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <sr...@apache.org> wrote:
>>>
>>>> Hi Noppanit,
>>>>
>>>> Just to make sure we're talking about the same thing, you want to send
>>>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>>>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>>>> implementations.
>>>>
>>>> You need to make your bolt define multiple output streams, e.g.
>>>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>>>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>>>> listen to "stream1" and BoltTwo listen to "stream2".
>>>>
>>>> There's some good example code here
>>>> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>>>
>>>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <
>>>> noppanit.c@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>>>> like this.
>>>>>
>>>>> {
>>>>> _c: "a"
>>>>> }
>>>>>
>>>>> or it should be
>>>>>
>>>>> {
>>>>> _c: "b"
>>>>> }
>>>>>
>>>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>>>> different bolt. How do I achieve this?
>>>>>
>>>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>>>> using GSon
>>>>>
>>>>>     @Override
>>>>>     public void execute(Tuple tuple) {
>>>>>       String partitionKey = (String)
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>>>       String sequenceNumber = (String)
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>>>       byte[] payload = (byte[])
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>>>
>>>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>>>       String data = null;
>>>>>       try {
>>>>>         data = decoder.decode(buffer).toString();
>>>>>
>>>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>>>
>>>>>         this.outputCollector.emit(tuple, new Values(map));
>>>>>         this.outputCollector.ack(tuple);
>>>>>
>>>>>       } catch (CharacterCodingException e) {
>>>>>         this.outputCollector.fail(tuple);
>>>>>       }
>>>>>
>>>>>     }
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>

Re: How to send a tuple to different bolt according to a value in the message

Posted by Stig Rohde Døssing <sr...@apache.org>.
Yes, though you might want to use collector.emit(streamid, tuple, value)
instead of collector.emit(streamid, value) so Storm keeps tracking whether
the tuple is acked downstream.

Also you might consider extending
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html
instead of BaseRichBolt for bolts that just receive a tuple, do some
processing and immediately emit one or more new tuples. BaseBasicBolt takes
care of acking and anchoring for you, so it can make your code a little
more simple.

2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <no...@gmail.com>:

> I do have a follow-up question after the question on Stackoverflow.
> Do I need a condition in my bolt for two different streams?
>
> For example
>
> if (tople.get("_c").equals("a")) {
>   collector.emit("stream1", new Values("field1Value"));
> } else {
>   collector.emit("stream12", new Values("field1Value"));
> }
>
>
> On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <no...@gmail.com>
> wrote:
>
>> Thanks for the reply.
>>
>> Yes that's what I wanted to achieve.
>>
>> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <sr...@apache.org> wrote:
>>
>>> Hi Noppanit,
>>>
>>> Just to make sure we're talking about the same thing, you want to send
>>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>>> implementations.
>>>
>>> You need to make your bolt define multiple output streams, e.g.
>>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>>> listen to "stream1" and BoltTwo listen to "stream2".
>>>
>>> There's some good example code here https://stackoverflow.com/
>>> questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>>
>>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <
>>> noppanit.c@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>>> like this.
>>>>
>>>> {
>>>> _c: "a"
>>>> }
>>>>
>>>> or it should be
>>>>
>>>> {
>>>> _c: "b"
>>>> }
>>>>
>>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>>> different bolt. How do I achieve this?
>>>>
>>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>>> using GSon
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple) {
>>>>       String partitionKey = (String) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>>       String sequenceNumber = (String) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>>       byte[] payload = (byte[]) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>>
>>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>>       String data = null;
>>>>       try {
>>>>         data = decoder.decode(buffer).toString();
>>>>
>>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>>
>>>>         this.outputCollector.emit(tuple, new Values(map));
>>>>         this.outputCollector.ack(tuple);
>>>>
>>>>       } catch (CharacterCodingException e) {
>>>>         this.outputCollector.fail(tuple);
>>>>       }
>>>>
>>>>     }
>>>>
>>>> Thanks
>>>>
>>>
>>>

Re: How to send a tuple to different bolt according to a value in the message

Posted by Noppanit Charassinvichai <no...@gmail.com>.
I do have a follow-up question after the question on Stackoverflow.
Do I need a condition in my bolt for two different streams?

For example

if (tople.get("_c").equals("a")) {
  collector.emit("stream1", new Values("field1Value"));
} else {
  collector.emit("stream12", new Values("field1Value"));
}


On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <no...@gmail.com>
wrote:

> Thanks for the reply.
>
> Yes that's what I wanted to achieve.
>
> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <sr...@apache.org> wrote:
>
>> Hi Noppanit,
>>
>> Just to make sure we're talking about the same thing, you want to send
>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>> implementations.
>>
>> You need to make your bolt define multiple output streams, e.g.
>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>> listen to "stream1" and BoltTwo listen to "stream2".
>>
>> There's some good example code here
>> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>
>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <noppanit.c@gmail.com
>> >:
>>
>>> Hi,
>>>
>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>> like this.
>>>
>>> {
>>> _c: "a"
>>> }
>>>
>>> or it should be
>>>
>>> {
>>> _c: "b"
>>> }
>>>
>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>> different bolt. How do I achieve this?
>>>
>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>> using GSon
>>>
>>>     @Override
>>>     public void execute(Tuple tuple) {
>>>       String partitionKey = (String)
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>       String sequenceNumber = (String)
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>       byte[] payload = (byte[])
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>
>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>       String data = null;
>>>       try {
>>>         data = decoder.decode(buffer).toString();
>>>
>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>
>>>         this.outputCollector.emit(tuple, new Values(map));
>>>         this.outputCollector.ack(tuple);
>>>
>>>       } catch (CharacterCodingException e) {
>>>         this.outputCollector.fail(tuple);
>>>       }
>>>
>>>     }
>>>
>>> Thanks
>>>
>>
>>

Re: How to send a tuple to different bolt according to a value in the message

Posted by Noppanit Charassinvichai <no...@gmail.com>.
Thanks for the reply.

Yes that's what I wanted to achieve.

On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <sr...@apache.org> wrote:

> Hi Noppanit,
>
> Just to make sure we're talking about the same thing, you want to send
> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
> to BoltTwo, where BoltOne and BoltTwo are two different bolt
> implementations.
>
> You need to make your bolt define multiple output streams, e.g. "stream1",
> "stream2". You then need to make it emit _c="a" to "stream1" and _c="b" to
> "stream2". When you build your topology, you then make BoltOne listen to
> "stream1" and BoltTwo listen to "stream2".
>
> There's some good example code here
> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>
> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <no...@gmail.com>
> :
>
>> Hi,
>>
>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>> like this.
>>
>> {
>> _c: "a"
>> }
>>
>> or it should be
>>
>> {
>> _c: "b"
>> }
>>
>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>> different bolt. How do I achieve this?
>>
>> This is the bolt that parsing the message from Kinesis to JSON Object
>> using GSon
>>
>>     @Override
>>     public void execute(Tuple tuple) {
>>       String partitionKey = (String)
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>       String sequenceNumber = (String)
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>       byte[] payload = (byte[])
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>
>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>       String data = null;
>>       try {
>>         data = decoder.decode(buffer).toString();
>>
>>         HashMap < String, String > map = new Gson().fromJson(data, new
>> TypeToken < HashMap < String, Object >> () {}.getType());
>>
>>         this.outputCollector.emit(tuple, new Values(map));
>>         this.outputCollector.ack(tuple);
>>
>>       } catch (CharacterCodingException e) {
>>         this.outputCollector.fail(tuple);
>>       }
>>
>>     }
>>
>> Thanks
>>
>
>

Re: How to send a tuple to different bolt according to a value in the message

Posted by Stig Rohde Døssing <sr...@apache.org>.
Hi Noppanit,

Just to make sure we're talking about the same thing, you want to send from
OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b" to
BoltTwo, where BoltOne and BoltTwo are two different bolt implementations.

You need to make your bolt define multiple output streams, e.g. "stream1",
"stream2". You then need to make it emit _c="a" to "stream1" and _c="b" to
"stream2". When you build your topology, you then make BoltOne listen to
"stream1" and BoltTwo listen to "stream2".

There's some good example code here
https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm

2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <no...@gmail.com>:

> Hi,
>
> I have a Storm cluster connecting to Kinesis Stream. The message looks
> like this.
>
> {
> _c: "a"
> }
>
> or it should be
>
> {
> _c: "b"
> }
>
> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
> different bolt. How do I achieve this?
>
> This is the bolt that parsing the message from Kinesis to JSON Object
> using GSon
>
>     @Override
>     public void execute(Tuple tuple) {
>       String partitionKey = (String) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>       String sequenceNumber = (String) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>       byte[] payload = (byte[]) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>
>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>       String data = null;
>       try {
>         data = decoder.decode(buffer).toString();
>
>         HashMap < String, String > map = new Gson().fromJson(data, new
> TypeToken < HashMap < String, Object >> () {}.getType());
>
>         this.outputCollector.emit(tuple, new Values(map));
>         this.outputCollector.ack(tuple);
>
>       } catch (CharacterCodingException e) {
>         this.outputCollector.fail(tuple);
>       }
>
>     }
>
> Thanks
>