You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Romeo Nocon <ro...@gmail.com> on 2015/06/17 22:07:02 UTC
Using FLUX and multiple streams to the same bolt
Hi,
I'm testing migrating over a topology I have to flux. The
spout:
- id: "spout"
bolts:
- id: "bolt_A"
className: "com.blah.boltA"
parallelism: 1
- id: "bolt_B"
className: "com.blah.boltB"
parallelism: 1
- id: "bolt_C"
className: "com.blah.boltC"
parallelism: 1
- id: "bolt_D"
className: "com.blah.boltD"
parallelism: 1
streams:
- name: ""
from: "spout"
to: "bolt_A"
grouping:
type: SHUFFLE
- name: "A-->B"
from: "bolt_A"
to: "bolt_B"
grouping:
streamId: "forB"
- name: "A-->C"
from: "bolt_A"
to: "bolt_C"
grouping:
streamId: "forC"
- name: "B-->D"
from: "bolt_B"
to: "bolt_D"
- name: "C-->D"
from: "bolt_C"
to: "bolt_D"
It builds something like below (imagine the arrow from A-> B, A-> C,
B->D, and C->D)
---------------------------------------------------------
Bolt_B
Spout -> Bolt_A -> Bolt_D
Bolt_C
---------------------------------------------------------
I get an error below in FLUX.
Exception in thread "main" java.lang.IllegalArgumentException: Bolt
has already been declared for id bolt_D
at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)
Looking at the buildStreamDefinitions code in the FluxBuilder it
iterates through each of the defined streams then calls the
appropriate
builder.setBolt(stream.getTo()...).
Since I have two streams going to Bolt_D it ends up getting the error
above. Does someone have a patch or fix out there already?
A possible fix is to cache the BoltDeclarer by getTo() id then skip
the builder.setBolt method so the code can continue setting the
different types of groupings on the rest of streams. Just a thought.
Thanks,
Romeo
Re: Using FLUX and multiple streams to the same bolt
Posted by Romeo Nocon <ro...@gmail.com>.
Thanks Taylor!
On Wed, Jun 17, 2015 at 9:19 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> JIRA:
>
> https://issues.apache.org/jira/browse/STORM-873
>
> On Jun 17, 2015, at 11:38 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>
> > Romeo,
> >
> > I have a fix (see below). Should be included in the next release (beta
> or final). I will follow up with a JIRA ID for tracking.
> >
> > ---------- TOPOLOGY DETAILS ----------
> > Topology Name: diamond-topology
> > --------------- SPOUTS ---------------
> > spout-1 [1] (backtype.storm.testing.TestWordSpout)
> > ---------------- BOLTS ---------------
> > A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > --------------- STREAMS ---------------
> > spout-1 --FIELDS--> A
> > A --SHUFFLE--> B
> > A --SHUFFLE--> C
> > C --SHUFFLE--> D
> > B --SHUFFLE--> D
> > --------------------------------------
> >
> > Thanks again for reporting this, and helping out with beta testing.
> >
> > - Taylor
> >
> >
> > On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> >
> >> Hi Romeo,
> >>
> >> Thanks for reporting that. It’s a bug, and your approach for a fix is
> correct.
> >>
> >> If you’d like, feel free to open a JIRA and optionally a pull request
> for a fix. Otherwise, I can take care of it.
> >>
> >> -Taylor
> >>
> >> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I'm testing migrating over a topology I have to flux. The
> >>>
> >>> spout:
> >>> - id: "spout"
> >>>
> >>> bolts:
> >>> - id: "bolt_A"
> >>> className: "com.blah.boltA"
> >>> parallelism: 1
> >>> - id: "bolt_B"
> >>> className: "com.blah.boltB"
> >>> parallelism: 1
> >>> - id: "bolt_C"
> >>> className: "com.blah.boltC"
> >>> parallelism: 1
> >>> - id: "bolt_D"
> >>> className: "com.blah.boltD"
> >>> parallelism: 1
> >>>
> >>> streams:
> >>> - name: ""
> >>> from: "spout"
> >>> to: "bolt_A"
> >>> grouping:
> >>> type: SHUFFLE
> >>> - name: "A-->B"
> >>> from: "bolt_A"
> >>> to: "bolt_B"
> >>> grouping:
> >>> streamId: "forB"
> >>> - name: "A-->C"
> >>> from: "bolt_A"
> >>> to: "bolt_C"
> >>> grouping:
> >>> streamId: "forC"
> >>> - name: "B-->D"
> >>> from: "bolt_B"
> >>> to: "bolt_D"
> >>> - name: "C-->D"
> >>> from: "bolt_C"
> >>> to: "bolt_D"
> >>>
> >>> It builds something like below (imagine the arrow from A-> B, A-> C,
> >>> B->D, and C->D)
> >>> ---------------------------------------------------------
> >>> Bolt_B
> >>> Spout -> Bolt_A -> Bolt_D
> >>> Bolt_C
> >>> ---------------------------------------------------------
> >>>
> >>> I get an error below in FLUX.
> >>>
> >>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
> >>> has already been declared for id bolt_D
> >>> at
> backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
> >>> at
> backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
> >>> at
> org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
> >>> at
> org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
> >>> at org.apache.storm.flux.Flux.runCli(Flux.java:153)
> >>> at org.apache.storm.flux.Flux.main(Flux.java:98)
> >>>
> >>> Looking at the buildStreamDefinitions code in the FluxBuilder it
> >>> iterates through each of the defined streams then calls the
> >>> appropriate
> >>>
> >>> builder.setBolt(stream.getTo()...).
> >>>
> >>> Since I have two streams going to Bolt_D it ends up getting the error
> >>> above. Does someone have a patch or fix out there already?
> >>>
> >>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
> >>> the builder.setBolt method so the code can continue setting the
> >>> different types of groupings on the rest of streams. Just a thought.
> >>>
> >>> Thanks,
> >>> Romeo
> >>
> >
>
>
Re: Using FLUX and multiple streams to the same bolt
Posted by "P. Taylor Goetz" <pt...@gmail.com>.
JIRA:
https://issues.apache.org/jira/browse/STORM-873
On Jun 17, 2015, at 11:38 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> Romeo,
>
> I have a fix (see below). Should be included in the next release (beta or final). I will follow up with a JIRA ID for tracking.
>
> ---------- TOPOLOGY DETAILS ----------
> Topology Name: diamond-topology
> --------------- SPOUTS ---------------
> spout-1 [1] (backtype.storm.testing.TestWordSpout)
> ---------------- BOLTS ---------------
> A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> --------------- STREAMS ---------------
> spout-1 --FIELDS--> A
> A --SHUFFLE--> B
> A --SHUFFLE--> C
> C --SHUFFLE--> D
> B --SHUFFLE--> D
> --------------------------------------
>
> Thanks again for reporting this, and helping out with beta testing.
>
> - Taylor
>
>
> On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>
>> Hi Romeo,
>>
>> Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
>>
>> If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.
>>
>> -Taylor
>>
>> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm testing migrating over a topology I have to flux. The
>>>
>>> spout:
>>> - id: "spout"
>>>
>>> bolts:
>>> - id: "bolt_A"
>>> className: "com.blah.boltA"
>>> parallelism: 1
>>> - id: "bolt_B"
>>> className: "com.blah.boltB"
>>> parallelism: 1
>>> - id: "bolt_C"
>>> className: "com.blah.boltC"
>>> parallelism: 1
>>> - id: "bolt_D"
>>> className: "com.blah.boltD"
>>> parallelism: 1
>>>
>>> streams:
>>> - name: ""
>>> from: "spout"
>>> to: "bolt_A"
>>> grouping:
>>> type: SHUFFLE
>>> - name: "A-->B"
>>> from: "bolt_A"
>>> to: "bolt_B"
>>> grouping:
>>> streamId: "forB"
>>> - name: "A-->C"
>>> from: "bolt_A"
>>> to: "bolt_C"
>>> grouping:
>>> streamId: "forC"
>>> - name: "B-->D"
>>> from: "bolt_B"
>>> to: "bolt_D"
>>> - name: "C-->D"
>>> from: "bolt_C"
>>> to: "bolt_D"
>>>
>>> It builds something like below (imagine the arrow from A-> B, A-> C,
>>> B->D, and C->D)
>>> ---------------------------------------------------------
>>> Bolt_B
>>> Spout -> Bolt_A -> Bolt_D
>>> Bolt_C
>>> ---------------------------------------------------------
>>>
>>> I get an error below in FLUX.
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
>>> has already been declared for id bolt_D
>>> at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
>>> at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
>>> at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
>>> at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
>>> at org.apache.storm.flux.Flux.runCli(Flux.java:153)
>>> at org.apache.storm.flux.Flux.main(Flux.java:98)
>>>
>>> Looking at the buildStreamDefinitions code in the FluxBuilder it
>>> iterates through each of the defined streams then calls the
>>> appropriate
>>>
>>> builder.setBolt(stream.getTo()...).
>>>
>>> Since I have two streams going to Bolt_D it ends up getting the error
>>> above. Does someone have a patch or fix out there already?
>>>
>>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
>>> the builder.setBolt method so the code can continue setting the
>>> different types of groupings on the rest of streams. Just a thought.
>>>
>>> Thanks,
>>> Romeo
>>
>
Re: Using FLUX and multiple streams to the same bolt
Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Romeo,
I have a fix (see below). Should be included in the next release (beta or final). I will follow up with a JIRA ID for tracking.
---------- TOPOLOGY DETAILS ----------
Topology Name: diamond-topology
--------------- SPOUTS ---------------
spout-1 [1] (backtype.storm.testing.TestWordSpout)
---------------- BOLTS ---------------
A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
--------------- STREAMS ---------------
spout-1 --FIELDS--> A
A --SHUFFLE--> B
A --SHUFFLE--> C
C --SHUFFLE--> D
B --SHUFFLE--> D
--------------------------------------
Thanks again for reporting this, and helping out with beta testing.
- Taylor
On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> Hi Romeo,
>
> Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
>
> If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.
>
> -Taylor
>
> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm testing migrating over a topology I have to flux. The
>>
>> spout:
>> - id: "spout"
>>
>> bolts:
>> - id: "bolt_A"
>> className: "com.blah.boltA"
>> parallelism: 1
>> - id: "bolt_B"
>> className: "com.blah.boltB"
>> parallelism: 1
>> - id: "bolt_C"
>> className: "com.blah.boltC"
>> parallelism: 1
>> - id: "bolt_D"
>> className: "com.blah.boltD"
>> parallelism: 1
>>
>> streams:
>> - name: ""
>> from: "spout"
>> to: "bolt_A"
>> grouping:
>> type: SHUFFLE
>> - name: "A-->B"
>> from: "bolt_A"
>> to: "bolt_B"
>> grouping:
>> streamId: "forB"
>> - name: "A-->C"
>> from: "bolt_A"
>> to: "bolt_C"
>> grouping:
>> streamId: "forC"
>> - name: "B-->D"
>> from: "bolt_B"
>> to: "bolt_D"
>> - name: "C-->D"
>> from: "bolt_C"
>> to: "bolt_D"
>>
>> It builds something like below (imagine the arrow from A-> B, A-> C,
>> B->D, and C->D)
>> ---------------------------------------------------------
>> Bolt_B
>> Spout -> Bolt_A -> Bolt_D
>> Bolt_C
>> ---------------------------------------------------------
>>
>> I get an error below in FLUX.
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
>> has already been declared for id bolt_D
>> at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
>> at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
>> at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
>> at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
>> at org.apache.storm.flux.Flux.runCli(Flux.java:153)
>> at org.apache.storm.flux.Flux.main(Flux.java:98)
>>
>> Looking at the buildStreamDefinitions code in the FluxBuilder it
>> iterates through each of the defined streams then calls the
>> appropriate
>>
>> builder.setBolt(stream.getTo()...).
>>
>> Since I have two streams going to Bolt_D it ends up getting the error
>> above. Does someone have a patch or fix out there already?
>>
>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
>> the builder.setBolt method so the code can continue setting the
>> different types of groupings on the rest of streams. Just a thought.
>>
>> Thanks,
>> Romeo
>
Re: Using FLUX and multiple streams to the same bolt
Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Hi Romeo,
Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.
-Taylor
On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
> Hi,
>
> I'm testing migrating over a topology I have to flux. The
>
> spout:
> - id: "spout"
>
> bolts:
> - id: "bolt_A"
> className: "com.blah.boltA"
> parallelism: 1
> - id: "bolt_B"
> className: "com.blah.boltB"
> parallelism: 1
> - id: "bolt_C"
> className: "com.blah.boltC"
> parallelism: 1
> - id: "bolt_D"
> className: "com.blah.boltD"
> parallelism: 1
>
> streams:
> - name: ""
> from: "spout"
> to: "bolt_A"
> grouping:
> type: SHUFFLE
> - name: "A-->B"
> from: "bolt_A"
> to: "bolt_B"
> grouping:
> streamId: "forB"
> - name: "A-->C"
> from: "bolt_A"
> to: "bolt_C"
> grouping:
> streamId: "forC"
> - name: "B-->D"
> from: "bolt_B"
> to: "bolt_D"
> - name: "C-->D"
> from: "bolt_C"
> to: "bolt_D"
>
> It builds something like below (imagine the arrow from A-> B, A-> C,
> B->D, and C->D)
> ---------------------------------------------------------
> Bolt_B
> Spout -> Bolt_A -> Bolt_D
> Bolt_C
> ---------------------------------------------------------
>
> I get an error below in FLUX.
>
> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
> has already been declared for id bolt_D
> at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
> at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
> at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
> at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
> at org.apache.storm.flux.Flux.runCli(Flux.java:153)
> at org.apache.storm.flux.Flux.main(Flux.java:98)
>
> Looking at the buildStreamDefinitions code in the FluxBuilder it
> iterates through each of the defined streams then calls the
> appropriate
>
> builder.setBolt(stream.getTo()...).
>
> Since I have two streams going to Bolt_D it ends up getting the error
> above. Does someone have a patch or fix out there already?
>
> A possible fix is to cache the BoltDeclarer by getTo() id then skip
> the builder.setBolt method so the code can continue setting the
> different types of groupings on the rest of streams. Just a thought.
>
> Thanks,
> Romeo