You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Romeo Nocon <ro...@gmail.com> on 2015/06/17 22:07:02 UTC

Using FLUX and multiple streams to the same bolt

Hi,

I'm testing migrating over a topology I have to flux.  The

spout:
  - id: "spout"

bolts:
  - id: "bolt_A"
    className: "com.blah.boltA"
    parallelism: 1
  - id: "bolt_B"
    className: "com.blah.boltB"
    parallelism: 1
  - id: "bolt_C"
    className: "com.blah.boltC"
    parallelism: 1
  - id: "bolt_D"
    className: "com.blah.boltD"
    parallelism: 1

streams:
  - name:  ""
    from: "spout"
    to: "bolt_A"
    grouping:
      type: SHUFFLE
  - name: "A-->B"
     from: "bolt_A"
     to: "bolt_B"
     grouping:
       streamId: "forB"
  - name: "A-->C"
     from: "bolt_A"
     to: "bolt_C"
     grouping:
       streamId: "forC"
  - name: "B-->D"
     from: "bolt_B"
     to: "bolt_D"
  - name: "C-->D"
     from: "bolt_C"
     to: "bolt_D"

It builds something like below (imagine the arrow from A-> B, A-> C,
B->D, and C->D)
---------------------------------------------------------
                            Bolt_B
Spout -> Bolt_A                  -> Bolt_D
                            Bolt_C
---------------------------------------------------------

I get an error below in FLUX.

Exception in thread "main" java.lang.IllegalArgumentException: Bolt
has already been declared for id bolt_D
    at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
    at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
    at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
    at org.apache.storm.flux.Flux.runCli(Flux.java:153)
    at org.apache.storm.flux.Flux.main(Flux.java:98)

Looking at the buildStreamDefinitions code in the FluxBuilder it
iterates through each of the defined streams then calls the
appropriate

     builder.setBolt(stream.getTo()...).

Since I have two streams going to Bolt_D it ends up getting the error
above.  Does someone have a patch or fix out there already?

A possible fix is to cache the BoltDeclarer by getTo() id then skip
the builder.setBolt method so the code can continue setting the
different types of groupings on the rest of streams.  Just a thought.

Thanks,
Romeo

Re: Using FLUX and multiple streams to the same bolt

Posted by Romeo Nocon <ro...@gmail.com>.
Thanks Taylor!

On Wed, Jun 17, 2015 at 9:19 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> JIRA:
>
> https://issues.apache.org/jira/browse/STORM-873
>
> On Jun 17, 2015, at 11:38 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>
> > Romeo,
> >
> > I have a fix (see below). Should be included in the next release (beta
> or final). I will follow up with a JIRA ID for tracking.
> >
> > ---------- TOPOLOGY DETAILS ----------
> > Topology Name: diamond-topology
> > --------------- SPOUTS ---------------
> > spout-1 [1] (backtype.storm.testing.TestWordSpout)
> > ---------------- BOLTS ---------------
> > A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> > --------------- STREAMS ---------------
> > spout-1 --FIELDS--> A
> > A --SHUFFLE--> B
> > A --SHUFFLE--> C
> > C --SHUFFLE--> D
> > B --SHUFFLE--> D
> > --------------------------------------
> >
> > Thanks again for reporting this, and helping out with beta testing.
> >
> > - Taylor
> >
> >
> > On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> >
> >> Hi Romeo,
> >>
> >> Thanks for reporting that. It’s a bug, and your approach for a fix is
> correct.
> >>
> >> If you’d like, feel free to open a JIRA and optionally a pull request
> for a fix. Otherwise, I can take care of it.
> >>
> >> -Taylor
> >>
> >> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I'm testing migrating over a topology I have to flux.  The
> >>>
> >>> spout:
> >>> - id: "spout"
> >>>
> >>> bolts:
> >>> - id: "bolt_A"
> >>>  className: "com.blah.boltA"
> >>>  parallelism: 1
> >>> - id: "bolt_B"
> >>>  className: "com.blah.boltB"
> >>>  parallelism: 1
> >>> - id: "bolt_C"
> >>>  className: "com.blah.boltC"
> >>>  parallelism: 1
> >>> - id: "bolt_D"
> >>>  className: "com.blah.boltD"
> >>>  parallelism: 1
> >>>
> >>> streams:
> >>> - name:  ""
> >>>  from: "spout"
> >>>  to: "bolt_A"
> >>>  grouping:
> >>>    type: SHUFFLE
> >>> - name: "A-->B"
> >>>   from: "bolt_A"
> >>>   to: "bolt_B"
> >>>   grouping:
> >>>     streamId: "forB"
> >>> - name: "A-->C"
> >>>   from: "bolt_A"
> >>>   to: "bolt_C"
> >>>   grouping:
> >>>     streamId: "forC"
> >>> - name: "B-->D"
> >>>   from: "bolt_B"
> >>>   to: "bolt_D"
> >>> - name: "C-->D"
> >>>   from: "bolt_C"
> >>>   to: "bolt_D"
> >>>
> >>> It builds something like below (imagine the arrow from A-> B, A-> C,
> >>> B->D, and C->D)
> >>> ---------------------------------------------------------
> >>>                          Bolt_B
> >>> Spout -> Bolt_A                  -> Bolt_D
> >>>                          Bolt_C
> >>> ---------------------------------------------------------
> >>>
> >>> I get an error below in FLUX.
> >>>
> >>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
> >>> has already been declared for id bolt_D
> >>>  at
> backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
> >>>  at
> backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
> >>>  at
> org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
> >>>  at
> org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
> >>>  at org.apache.storm.flux.Flux.runCli(Flux.java:153)
> >>>  at org.apache.storm.flux.Flux.main(Flux.java:98)
> >>>
> >>> Looking at the buildStreamDefinitions code in the FluxBuilder it
> >>> iterates through each of the defined streams then calls the
> >>> appropriate
> >>>
> >>>   builder.setBolt(stream.getTo()...).
> >>>
> >>> Since I have two streams going to Bolt_D it ends up getting the error
> >>> above.  Does someone have a patch or fix out there already?
> >>>
> >>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
> >>> the builder.setBolt method so the code can continue setting the
> >>> different types of groupings on the rest of streams.  Just a thought.
> >>>
> >>> Thanks,
> >>> Romeo
> >>
> >
>
>

Re: Using FLUX and multiple streams to the same bolt

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
JIRA:

https://issues.apache.org/jira/browse/STORM-873

On Jun 17, 2015, at 11:38 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> Romeo,
> 
> I have a fix (see below). Should be included in the next release (beta or final). I will follow up with a JIRA ID for tracking.
> 
> ---------- TOPOLOGY DETAILS ----------
> Topology Name: diamond-topology
> --------------- SPOUTS ---------------
> spout-1 [1] (backtype.storm.testing.TestWordSpout)
> ---------------- BOLTS ---------------
> A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
> --------------- STREAMS ---------------
> spout-1 --FIELDS--> A
> A --SHUFFLE--> B
> A --SHUFFLE--> C
> C --SHUFFLE--> D
> B --SHUFFLE--> D
> --------------------------------------
> 
> Thanks again for reporting this, and helping out with beta testing.
> 
> - Taylor
> 
> 
> On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
> 
>> Hi Romeo,
>> 
>> Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
>> 
>> If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.
>> 
>> -Taylor
>> 
>> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> I'm testing migrating over a topology I have to flux.  The
>>> 
>>> spout:
>>> - id: "spout"
>>> 
>>> bolts:
>>> - id: "bolt_A"
>>>  className: "com.blah.boltA"
>>>  parallelism: 1
>>> - id: "bolt_B"
>>>  className: "com.blah.boltB"
>>>  parallelism: 1
>>> - id: "bolt_C"
>>>  className: "com.blah.boltC"
>>>  parallelism: 1
>>> - id: "bolt_D"
>>>  className: "com.blah.boltD"
>>>  parallelism: 1
>>> 
>>> streams:
>>> - name:  ""
>>>  from: "spout"
>>>  to: "bolt_A"
>>>  grouping:
>>>    type: SHUFFLE
>>> - name: "A-->B"
>>>   from: "bolt_A"
>>>   to: "bolt_B"
>>>   grouping:
>>>     streamId: "forB"
>>> - name: "A-->C"
>>>   from: "bolt_A"
>>>   to: "bolt_C"
>>>   grouping:
>>>     streamId: "forC"
>>> - name: "B-->D"
>>>   from: "bolt_B"
>>>   to: "bolt_D"
>>> - name: "C-->D"
>>>   from: "bolt_C"
>>>   to: "bolt_D"
>>> 
>>> It builds something like below (imagine the arrow from A-> B, A-> C,
>>> B->D, and C->D)
>>> ---------------------------------------------------------
>>>                          Bolt_B
>>> Spout -> Bolt_A                  -> Bolt_D
>>>                          Bolt_C
>>> ---------------------------------------------------------
>>> 
>>> I get an error below in FLUX.
>>> 
>>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
>>> has already been declared for id bolt_D
>>>  at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
>>>  at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
>>>  at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
>>>  at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
>>>  at org.apache.storm.flux.Flux.runCli(Flux.java:153)
>>>  at org.apache.storm.flux.Flux.main(Flux.java:98)
>>> 
>>> Looking at the buildStreamDefinitions code in the FluxBuilder it
>>> iterates through each of the defined streams then calls the
>>> appropriate
>>> 
>>>   builder.setBolt(stream.getTo()...).
>>> 
>>> Since I have two streams going to Bolt_D it ends up getting the error
>>> above.  Does someone have a patch or fix out there already?
>>> 
>>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
>>> the builder.setBolt method so the code can continue setting the
>>> different types of groupings on the rest of streams.  Just a thought.
>>> 
>>> Thanks,
>>> Romeo
>> 
> 


Re: Using FLUX and multiple streams to the same bolt

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Romeo,

I have a fix (see below). Should be included in the next release (beta or final). I will follow up with a JIRA ID for tracking.

---------- TOPOLOGY DETAILS ----------
Topology Name: diamond-topology
--------------- SPOUTS ---------------
spout-1 [1] (backtype.storm.testing.TestWordSpout)
---------------- BOLTS ---------------
A [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
B [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
C [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
D [1] (org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
--------------- STREAMS ---------------
spout-1 --FIELDS--> A
A --SHUFFLE--> B
A --SHUFFLE--> C
C --SHUFFLE--> D
B --SHUFFLE--> D
--------------------------------------

Thanks again for reporting this, and helping out with beta testing.

- Taylor


On Jun 17, 2015, at 4:54 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> Hi Romeo,
> 
> Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
> 
> If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.
> 
> -Taylor
> 
> On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:
> 
>> Hi,
>> 
>> I'm testing migrating over a topology I have to flux.  The
>> 
>> spout:
>> - id: "spout"
>> 
>> bolts:
>> - id: "bolt_A"
>>   className: "com.blah.boltA"
>>   parallelism: 1
>> - id: "bolt_B"
>>   className: "com.blah.boltB"
>>   parallelism: 1
>> - id: "bolt_C"
>>   className: "com.blah.boltC"
>>   parallelism: 1
>> - id: "bolt_D"
>>   className: "com.blah.boltD"
>>   parallelism: 1
>> 
>> streams:
>> - name:  ""
>>   from: "spout"
>>   to: "bolt_A"
>>   grouping:
>>     type: SHUFFLE
>> - name: "A-->B"
>>    from: "bolt_A"
>>    to: "bolt_B"
>>    grouping:
>>      streamId: "forB"
>> - name: "A-->C"
>>    from: "bolt_A"
>>    to: "bolt_C"
>>    grouping:
>>      streamId: "forC"
>> - name: "B-->D"
>>    from: "bolt_B"
>>    to: "bolt_D"
>> - name: "C-->D"
>>    from: "bolt_C"
>>    to: "bolt_D"
>> 
>> It builds something like below (imagine the arrow from A-> B, A-> C,
>> B->D, and C->D)
>> ---------------------------------------------------------
>>                           Bolt_B
>> Spout -> Bolt_A                  -> Bolt_D
>>                           Bolt_C
>> ---------------------------------------------------------
>> 
>> I get an error below in FLUX.
>> 
>> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
>> has already been declared for id bolt_D
>>   at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
>>   at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
>>   at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
>>   at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
>>   at org.apache.storm.flux.Flux.runCli(Flux.java:153)
>>   at org.apache.storm.flux.Flux.main(Flux.java:98)
>> 
>> Looking at the buildStreamDefinitions code in the FluxBuilder it
>> iterates through each of the defined streams then calls the
>> appropriate
>> 
>>    builder.setBolt(stream.getTo()...).
>> 
>> Since I have two streams going to Bolt_D it ends up getting the error
>> above.  Does someone have a patch or fix out there already?
>> 
>> A possible fix is to cache the BoltDeclarer by getTo() id then skip
>> the builder.setBolt method so the code can continue setting the
>> different types of groupings on the rest of streams.  Just a thought.
>> 
>> Thanks,
>> Romeo
> 


Re: Using FLUX and multiple streams to the same bolt

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Hi Romeo,

Thanks for reporting that. It’s a bug, and your approach for a fix is correct.

If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it.

-Taylor

On Jun 17, 2015, at 4:07 PM, Romeo Nocon <ro...@gmail.com> wrote:

> Hi,
> 
> I'm testing migrating over a topology I have to flux.  The
> 
> spout:
>  - id: "spout"
> 
> bolts:
>  - id: "bolt_A"
>    className: "com.blah.boltA"
>    parallelism: 1
>  - id: "bolt_B"
>    className: "com.blah.boltB"
>    parallelism: 1
>  - id: "bolt_C"
>    className: "com.blah.boltC"
>    parallelism: 1
>  - id: "bolt_D"
>    className: "com.blah.boltD"
>    parallelism: 1
> 
> streams:
>  - name:  ""
>    from: "spout"
>    to: "bolt_A"
>    grouping:
>      type: SHUFFLE
>  - name: "A-->B"
>     from: "bolt_A"
>     to: "bolt_B"
>     grouping:
>       streamId: "forB"
>  - name: "A-->C"
>     from: "bolt_A"
>     to: "bolt_C"
>     grouping:
>       streamId: "forC"
>  - name: "B-->D"
>     from: "bolt_B"
>     to: "bolt_D"
>  - name: "C-->D"
>     from: "bolt_C"
>     to: "bolt_D"
> 
> It builds something like below (imagine the arrow from A-> B, A-> C,
> B->D, and C->D)
> ---------------------------------------------------------
>                            Bolt_B
> Spout -> Bolt_A                  -> Bolt_D
>                            Bolt_C
> ---------------------------------------------------------
> 
> I get an error below in FLUX.
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Bolt
> has already been declared for id bolt_D
>    at backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
>    at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
>    at org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
>    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
>    at org.apache.storm.flux.Flux.runCli(Flux.java:153)
>    at org.apache.storm.flux.Flux.main(Flux.java:98)
> 
> Looking at the buildStreamDefinitions code in the FluxBuilder it
> iterates through each of the defined streams then calls the
> appropriate
> 
>     builder.setBolt(stream.getTo()...).
> 
> Since I have two streams going to Bolt_D it ends up getting the error
> above.  Does someone have a patch or fix out there already?
> 
> A possible fix is to cache the BoltDeclarer by getTo() id then skip
> the builder.setBolt method so the code can continue setting the
> different types of groupings on the rest of streams.  Just a thought.
> 
> Thanks,
> Romeo