You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@commons.apache.org by Tim Dudgeon <td...@informaticsmatters.com> on 2008/11/02 12:26:25 UTC

Re: [PIPELINE] Questions about pipeline

See comments below.

Tim

Ken Tanaka wrote:
> 
> 
> Tim Dudgeon wrote:
>> Ken Tanaka wrote:
>>> Hi Tim, 
> ...
>>>>
>>> At present, the structure for storing stages is a linked list, and 
>>> branches are implemented as additional pipelines accessed by a name 
>>> through a HashMap. To generally handle branching and merging, a 
>>> directed acyclic graph (DAG) would better serve, but that would 
>>> require the pipeline code to be rewritten at this level. Arguments 
>>> could also be made for allowing cycles, as in directed graphs, but 
>>> that would be harder to debug, and with a GUI might be a step toward 
>>> a visual programming language--so I don't think this should be 
>>> pursued yet unless there are volunteers...
>>>
>>
>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>> would be better too.
>> But, yes, I am ideally wanting visual designer too.
>>
> I'd like a visual designer too at some point, but that's a ways off into 
> the future.
>>
>>>>
>>>> Taken together I can see a generalisation here using named ports 
>>>> (input and outut), which is similar, but not identical, to your 
>>>> current concept of branches.
>>>>
>>>> So you have:
>>>> BaseStage.emit(String branch, Object obj);
>>>> whereas I would conceptually see this as:
>>>> emit(String port, Object obj);
>>>> and you have:
>>>> Stage.process(Object obj);
>>>> whereas I would would conceptually see this as:
>>>> Stage.process(String port, Object obj);
>>>>
>>>> And when a pipeline is being assembled a downstream stage is 
>>>> attached to a particular port of a stage, not the stage itself. It 
>>>> then just recieves data sent to that particular port, but not the 
>>>> other ports.
>>> I could see that this would work, but would need either modifying a 
>>> number of stages already written, or maybe creating a compatibility 
>>> stage driver that takes older style stages so that the input object 
>>> comes from a configured port name, usually "input" and a sends the 
>>> output to  configured output ports named "output" and whatever the 
>>> previous branch name(s) were, if any. Stages that used to look for 
>>> events for input should be rewritten to read multiple inputs ( 
>>> Stage.process(String port, Object obj) as you suggested). Events 
>>> would then be reserved for truly out-of-band signals between stages 
>>> rather than carrying data for processing.
>>
>> Agreed, I think with would be good. I think existing stages could be 
>> made compatible by having a default input and output port, and to use 
>> those if not specific port was specified.
>> A default in/out port would probably be necessary to allow simple 
>> auto-wiring.
>>
>>>>
>>>> I'd love to hear how compatible the current system is with this way 
>>>> of seeing things. Are we just talking about a new type of Stage 
>>>> implementation, or a more fundamental incompatibility at the API level.
>>>>
>>> I think you have some good ideas. This is changing the Stage 
>>> implementation, which affects on the order of 60 stages for us that 
>>> override the process method, unless the compatibility stage driver 
>>> works out. The top level pipeline would also be restructured. The 
>>> amount of work required puts this out of the near term for me to work 
>>> on it, but there may be other developers/contributors to take this on.
>>
>> I need to investigate more fully here, and consider the other options.
>> But potentially this is certainly of interest.
>>
>> So is all that's necessary to prototype this to create a new Stage 
>> implementation, with new emit( ... ) and process( ... ) methods?
> I'm thinking it's more involved than that. To really deal well with the 
> arbitrary number of downstream stages rather than just one means 
> changing the digester rules 
> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
> on specifying what follows. Normally a stage is connected to the 
> preceding stage if it is listed in that order in the configuration file. 
> This should be a default behavior, but if  stage2 and stage3 both follow 
> stage1 then some notation of which is the previous stage is needed.
> 
> stage1----stage2
>    |
>    |-----stage3
> 
> might be set up as conf_pipe.xml:
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1" 
> driverFactoryId="df1" stageId="stage1"/>
>   <stage className="com.demo.pipeline.stages.Stage2" 
> driverFactoryId="df1"/>
>   <stage className="com.demo.pipeline.stages.Stage3" 
> driverFactoryId="df1" follows="stage1"/>
> </pipeline>
> 
> I propose the 'follows="stage1"' attribute to connect stage3 to stage1 
> instead of stage2 immediately preceding. This seems cleaner than setting 
> up a branch and matching up branch key names between the branching stage 
> and the secondary pipeline(s). Can you think of a cleaner way to 
> configure this?

I think we're in danger of looking at this the wrong way. The XML should 
reflect the underlying data model, not drive it. But to stick with this 
paradigm I would think it might be best to explicity define the 
connections in the model definition. Maybe something more like this:

<pipeline>
   ...
   <stage className="com.demo.pipeline.stages.Stage1" 	
	driverFactoryId="df1" stageId="stage1">
   </stage>
   <stage className="com.demo.pipeline.stages.Stage2"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage3"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage4"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="fail" inputPort="aPort"/>
   </stage>
</pipeline>

I think this would allow more flexibility, as:
1. a stage could define multiple inputs if it needed to.
2. each connection is explicity defined and could have extra attributes 
added in future (e.g. a disable attribute to disable execution of that 
part of the pipeline.
3. The concept of input can probably be generalised to include the 
"feed", allowing multiple feeds to be used (as discussed earlier in this 
thread). e.g. stage1 would also have an input that would be the feed.


> 
> The Pipeline.java class will need to be modified to build and maintain a 
> DAG structure rather than a linked list. The incoming data are managed 
> by a queue in the stage driver, which would change to a group of queues, 
> allowing multiple inputs (ports). I'm assuming there is an open source 
> directed acyclic graph library out there that can replace the linked list.

If defined as I propose I'm not sure a specific graph library is 
necessary. The model just comprises a set of stages that know how they 
are connected. e.g. the connections are already implicit in the model.
But this probably needs more thought.



> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.
Thanks for confirming the examples. I figure that fleshing this out now 
should speed development at a future time.

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> See comments below.
>>>
>>> Tim
...
>
>
>>> 2. each connection is explicity defined and could have extra 
>>> attributes added in future (e.g. a disable attribute to disable 
>>> execution of that part of the pipeline.
>>> 3. The concept of input can probably be generalised to include the 
>>> "feed", allowing multiple feeds to be used (as discussed earlier in 
>>> this thread). e.g. stage1 would also have an input that would be the 
>>> feed.
>>>
>> Do you envision a stage with two inputs (aPort and bPort) waiting 
>> until there are inputs on both before its stageDriver invokes the 
>> process method? If stage5 needs two inputs, and stage2 provides 3 
>> values and stage3 provides 2 values, there are just 2 complete pairs 
>> of values. The third value from stage2 could wait indefinitely for a 
>> matching input from stage3. Currently stages run until their queue is 
>> empty, but with multiple inputs that could be imbalanced, it might be 
>> better to set the quit condition to any one queue is empty and all 
>> upstream stages claim to be complete. Any non-empty queues on exit 
>> can trigger a warning.
>
> Yes, there are multiple scenarios here. e.g.
> 1. pairwise processing.
> a1 + b1
> a2 + b2
> a3 + b3
> ...
>
> 2. combinatorial processing
> a1 + b1
> a1 + b2
> a1 + b3
> a2 + b1
> a2 + b2
> ....
>
> (maybe others too)
>
>
> Clearly 1's inputs has to be matched, and if they are not some exit 
> rules would be required.
> Also, as you have mentioned before there are potential memory issues 
> with combinatorial processing of large sets of data, but I think 
> solutions can be found for this.
>
In general I prefer to stick to scenario 1. Any deviation can be handled 
by making a stage behave differently rather than putting this capability 
into the framework. A stage can be written to do nothing but accept 
pairwise arguments and emit combinatorial output, it can then be 
inserted before any stage to which you want to add combinatorial input 
processing.

Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
Ken Tanaka wrote:
> 
> 
> Tim Dudgeon wrote:
>> See comments below.
>>
>> Tim
>>
>> Ken Tanaka wrote:
>>>
>>>
>>> Tim Dudgeon wrote:
>>>> Ken Tanaka wrote:
>>>>> Hi Tim, 
>>> ...
>>>>>>
>>>>> At present, the structure for storing stages is a linked list, and 
>>>>> branches are implemented as additional pipelines accessed by a name 
>>>>> through a HashMap. To generally handle branching and merging, a 
>>>>> directed acyclic graph (DAG) would better serve, but that would 
>>>>> require the pipeline code to be rewritten at this level. Arguments 
>>>>> could also be made for allowing cycles, as in directed graphs, but 
>>>>> that would be harder to debug, and with a GUI might be a step 
>>>>> toward a visual programming language--so I don't think this should 
>>>>> be pursued yet unless there are volunteers...
>>>>>
>>>>
>>>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>>>> would be better too.
>>>> But, yes, I am ideally wanting visual designer too.
>>>>
>>> I'd like a visual designer too at some point, but that's a ways off 
>>> into the future.
>>>>
>>>>>>
>>>>>> Taken together I can see a generalisation here using named ports 
>>>>>> (input and outut), which is similar, but not identical, to your 
>>>>>> current concept of branches.
>>>>>>
>>>>>> So you have:
>>>>>> BaseStage.emit(String branch, Object obj);
>>>>>> whereas I would conceptually see this as:
>>>>>> emit(String port, Object obj);
>>>>>> and you have:
>>>>>> Stage.process(Object obj);
>>>>>> whereas I would would conceptually see this as:
>>>>>> Stage.process(String port, Object obj);
>>>>>>
>>>>>> And when a pipeline is being assembled a downstream stage is 
>>>>>> attached to a particular port of a stage, not the stage itself. It 
>>>>>> then just recieves data sent to that particular port, but not the 
>>>>>> other ports.
>>>>> I could see that this would work, but would need either modifying a 
>>>>> number of stages already written, or maybe creating a compatibility 
>>>>> stage driver that takes older style stages so that the input object 
>>>>> comes from a configured port name, usually "input" and a sends the 
>>>>> output to  configured output ports named "output" and whatever the 
>>>>> previous branch name(s) were, if any. Stages that used to look for 
>>>>> events for input should be rewritten to read multiple inputs ( 
>>>>> Stage.process(String port, Object obj) as you suggested). Events 
>>>>> would then be reserved for truly out-of-band signals between stages 
>>>>> rather than carrying data for processing.
>>>>
>>>> Agreed, I think with would be good. I think existing stages could be 
>>>> made compatible by having a default input and output port, and to 
>>>> use those if not specific port was specified.
>>>> A default in/out port would probably be necessary to allow simple 
>>>> auto-wiring.
>>>>
>>>>>>
>>>>>> I'd love to hear how compatible the current system is with this 
>>>>>> way of seeing things. Are we just talking about a new type of 
>>>>>> Stage implementation, or a more fundamental incompatibility at the 
>>>>>> API level.
>>>>>>
>>>>> I think you have some good ideas. This is changing the Stage 
>>>>> implementation, which affects on the order of 60 stages for us that 
>>>>> override the process method, unless the compatibility stage driver 
>>>>> works out. The top level pipeline would also be restructured. The 
>>>>> amount of work required puts this out of the near term for me to 
>>>>> work on it, but there may be other developers/contributors to take 
>>>>> this on.
>>>>
>>>> I need to investigate more fully here, and consider the other options.
>>>> But potentially this is certainly of interest.
>>>>
>>>> So is all that's necessary to prototype this to create a new Stage 
>>>> implementation, with new emit( ... ) and process( ... ) methods?
>>> I'm thinking it's more involved than that. To really deal well with 
>>> the arbitrary number of downstream stages rather than just one means 
>>> changing the digester rules 
>>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
>>> on specifying what follows. Normally a stage is connected to the 
>>> preceding stage if it is listed in that order in the configuration 
>>> file. This should be a default behavior, but if  stage2 and stage3 
>>> both follow stage1 then some notation of which is the previous stage 
>>> is needed.
>>>
>>> stage1----stage2
>>>    |
>>>    |-----stage3
>>>
>>> might be set up as conf_pipe.xml:
>>> <pipeline>
>>>   ...
>>>   <stage className="com.demo.pipeline.stages.Stage1" 
>>> driverFactoryId="df1" stageId="stage1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage2" 
>>> driverFactoryId="df1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage3" 
>>> driverFactoryId="df1" follows="stage1"/>
>>> </pipeline>
>>>
>>> I propose the 'follows="stage1"' attribute to connect stage3 to 
>>> stage1 instead of stage2 immediately preceding. This seems cleaner 
>>> than setting up a branch and matching up branch key names between the 
>>> branching stage and the secondary pipeline(s). Can you think of a 
>>> cleaner way to configure this?
>>
>> I think we're in danger of looking at this the wrong way. The XML 
>> should reflect the underlying data model, not drive it. But to stick 
>> with this paradigm I would think it might be best to explicity define 
>> the connections in the model definition. Maybe something more like this:
>>
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1"        
>> driverFactoryId="df1" stageId="stage1">
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage2"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
> Just to clarify, for Stage2, when you specify '<input stageId="stage1" 
> outputPort="pass"/>', 'outputPort="pass"' refers to an output port of 
> stage1 and named "pass" and is not specifying that the stage2 output 
> port is named "pass", right? So Stage1 has two output ports, named 
> "pass" and "fail", and this would be documented somewhere so you knew 
> what to connect to when you wrote the configuration XML?

Yes. That's the idea. Maybe different names might help here.


>>   <stage className="com.demo.pipeline.stages.Stage3"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage4"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>>   </stage>
> So here Stage4 has an input port named "aPort" and it is loaded from the 
> stage1 output port named "fail"?


Correct again.

>> </pipeline>
>>
>> I think this would allow more flexibility, as:
>> 1. a stage could define multiple inputs if it needed to.
> If I understand you correctly, suppose there is a stage5 that has input 
> ports "aPort" and "bPort" that we would like to receive data from stage2 
> and stage3 ("pass" output port from both). Then it would be specified as 
> follows:
> 
>  <stage className="com.demo.pipeline.stages.Stage5"
>    driverFactoryId="df1">
>    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
>    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
>  </stage>
> 

Correct again.

> I also assume that Stage2 and Stage3 are given stageIds of "stage2" and 
> "stage3" respectively.
> 
> [stage1]------------>[stage2]------------>[stage5]
>     |   pass->(in)           pass->aPort   ^
>     |                                      |
>     +-------------->[stage3]---------------+
>     |   pass->(in)           pass->bPort
>     |
>     +-------------->[stage4]
>         fail->aPort
> 

That's the idea. Its obviously a bit on an extreme case as it uses 
branching and merging, but it covers what I think is needed.


>> 2. each connection is explicity defined and could have extra 
>> attributes added in future (e.g. a disable attribute to disable 
>> execution of that part of the pipeline.
>> 3. The concept of input can probably be generalised to include the 
>> "feed", allowing multiple feeds to be used (as discussed earlier in 
>> this thread). e.g. stage1 would also have an input that would be the 
>> feed.
>>
> Do you envision a stage with two inputs (aPort and bPort) waiting until 
> there are inputs on both before its stageDriver invokes the process 
> method? If stage5 needs two inputs, and stage2 provides 3 values and 
> stage3 provides 2 values, there are just 2 complete pairs of values. The 
> third value from stage2 could wait indefinitely for a matching input 
> from stage3. Currently stages run until their queue is empty, but with 
> multiple inputs that could be imbalanced, it might be better to set the 
> quit condition to any one queue is empty and all upstream stages claim 
> to be complete. Any non-empty queues on exit can trigger a warning.

Yes, there are multiple scenarios here. e.g.
1. pairwise processing.
a1 + b1
a2 + b2
a3 + b3
...

2. combinatorial processing
a1 + b1
a1 + b2
a1 + b3
a2 + b1
a2 + b2
....

(maybe others too)


Clearly 1's inputs has to be matched, and if they are not some exit 
rules would be required.
Also, as you have mentioned before there are potential memory issues 
with combinatorial processing of large sets of data, but I think 
solutions can be found for this.


Tim


>>
>>>
>>> The Pipeline.java class will need to be modified to build and 
>>> maintain a DAG structure rather than a linked list. The incoming data 
>>> are managed by a queue in the stage driver, which would change to a 
>>> group of queues, allowing multiple inputs (ports). I'm assuming there 
>>> is an open source directed acyclic graph library out there that can 
>>> replace the linked list.
>>
>> If defined as I propose I'm not sure a specific graph library is 
>> necessary. The model just comprises a set of stages that know how they 
>> are connected. e.g. the connections are already implicit in the model.
>> But this probably needs more thought.
>>
> Currently the linked list of stages is used for lazy initialization, to 
> find the next stage feeder the first time it is used. To allow general 
> connections, the downstream feeder link could become an array of 
> subsequent stageDrivers, with the connections set up as the pipeline is 
> built. In that case, then a DAG library would not be needed, and we 
> could keep the linked list as is.
> 
> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.

Tim Dudgeon wrote:
> See comments below.
>
> Tim
>
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> Ken Tanaka wrote:
>>>> Hi Tim, 
>> ...
>>>>>
>>>> At present, the structure for storing stages is a linked list, and 
>>>> branches are implemented as additional pipelines accessed by a name 
>>>> through a HashMap. To generally handle branching and merging, a 
>>>> directed acyclic graph (DAG) would better serve, but that would 
>>>> require the pipeline code to be rewritten at this level. Arguments 
>>>> could also be made for allowing cycles, as in directed graphs, but 
>>>> that would be harder to debug, and with a GUI might be a step 
>>>> toward a visual programming language--so I don't think this should 
>>>> be pursued yet unless there are volunteers...
>>>>
>>>
>>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>>> would be better too.
>>> But, yes, I am ideally wanting visual designer too.
>>>
>> I'd like a visual designer too at some point, but that's a ways off 
>> into the future.
>>>
>>>>>
>>>>> Taken together I can see a generalisation here using named ports 
>>>>> (input and outut), which is similar, but not identical, to your 
>>>>> current concept of branches.
>>>>>
>>>>> So you have:
>>>>> BaseStage.emit(String branch, Object obj);
>>>>> whereas I would conceptually see this as:
>>>>> emit(String port, Object obj);
>>>>> and you have:
>>>>> Stage.process(Object obj);
>>>>> whereas I would would conceptually see this as:
>>>>> Stage.process(String port, Object obj);
>>>>>
>>>>> And when a pipeline is being assembled a downstream stage is 
>>>>> attached to a particular port of a stage, not the stage itself. It 
>>>>> then just recieves data sent to that particular port, but not the 
>>>>> other ports.
>>>> I could see that this would work, but would need either modifying a 
>>>> number of stages already written, or maybe creating a compatibility 
>>>> stage driver that takes older style stages so that the input object 
>>>> comes from a configured port name, usually "input" and a sends the 
>>>> output to  configured output ports named "output" and whatever the 
>>>> previous branch name(s) were, if any. Stages that used to look for 
>>>> events for input should be rewritten to read multiple inputs ( 
>>>> Stage.process(String port, Object obj) as you suggested). Events 
>>>> would then be reserved for truly out-of-band signals between stages 
>>>> rather than carrying data for processing.
>>>
>>> Agreed, I think with would be good. I think existing stages could be 
>>> made compatible by having a default input and output port, and to 
>>> use those if not specific port was specified.
>>> A default in/out port would probably be necessary to allow simple 
>>> auto-wiring.
>>>
>>>>>
>>>>> I'd love to hear how compatible the current system is with this 
>>>>> way of seeing things. Are we just talking about a new type of 
>>>>> Stage implementation, or a more fundamental incompatibility at the 
>>>>> API level.
>>>>>
>>>> I think you have some good ideas. This is changing the Stage 
>>>> implementation, which affects on the order of 60 stages for us that 
>>>> override the process method, unless the compatibility stage driver 
>>>> works out. The top level pipeline would also be restructured. The 
>>>> amount of work required puts this out of the near term for me to 
>>>> work on it, but there may be other developers/contributors to take 
>>>> this on.
>>>
>>> I need to investigate more fully here, and consider the other options.
>>> But potentially this is certainly of interest.
>>>
>>> So is all that's necessary to prototype this to create a new Stage 
>>> implementation, with new emit( ... ) and process( ... ) methods?
>> I'm thinking it's more involved than that. To really deal well with 
>> the arbitrary number of downstream stages rather than just one means 
>> changing the digester rules 
>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
>> on specifying what follows. Normally a stage is connected to the 
>> preceding stage if it is listed in that order in the configuration 
>> file. This should be a default behavior, but if  stage2 and stage3 
>> both follow stage1 then some notation of which is the previous stage 
>> is needed.
>>
>> stage1----stage2
>>    |
>>    |-----stage3
>>
>> might be set up as conf_pipe.xml:
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1" 
>> driverFactoryId="df1" stageId="stage1"/>
>>   <stage className="com.demo.pipeline.stages.Stage2" 
>> driverFactoryId="df1"/>
>>   <stage className="com.demo.pipeline.stages.Stage3" 
>> driverFactoryId="df1" follows="stage1"/>
>> </pipeline>
>>
>> I propose the 'follows="stage1"' attribute to connect stage3 to 
>> stage1 instead of stage2 immediately preceding. This seems cleaner 
>> than setting up a branch and matching up branch key names between the 
>> branching stage and the secondary pipeline(s). Can you think of a 
>> cleaner way to configure this?
>
> I think we're in danger of looking at this the wrong way. The XML 
> should reflect the underlying data model, not drive it. But to stick 
> with this paradigm I would think it might be best to explicity define 
> the connections in the model definition. Maybe something more like this:
>
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1"    
>     driverFactoryId="df1" stageId="stage1">
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage2"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
Just to clarify, for Stage2, when you specify '<input stageId="stage1" 
outputPort="pass"/>', 'outputPort="pass"' refers to an output port of 
stage1 and named "pass" and is not specifying that the stage2 output 
port is named "pass", right? So Stage1 has two output ports, named 
"pass" and "fail", and this would be documented somewhere so you knew 
what to connect to when you wrote the configuration XML?
>   <stage className="com.demo.pipeline.stages.Stage3"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage4"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>   </stage>
So here Stage4 has an input port named "aPort" and it is loaded from the 
stage1 output port named "fail"?
> </pipeline>
>
> I think this would allow more flexibility, as:
> 1. a stage could define multiple inputs if it needed to.
If I understand you correctly, suppose there is a stage5 that has input 
ports "aPort" and "bPort" that we would like to receive data from stage2 
and stage3 ("pass" output port from both). Then it would be specified as 
follows:

  <stage className="com.demo.pipeline.stages.Stage5"
    driverFactoryId="df1">
    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
  </stage>

I also assume that Stage2 and Stage3 are given stageIds of "stage2" and 
"stage3" respectively.

[stage1]------------>[stage2]------------>[stage5]
     |   pass->(in)           pass->aPort   ^
     |                                      |
     +-------------->[stage3]---------------+
     |   pass->(in)           pass->bPort
     |
     +-------------->[stage4]
         fail->aPort

> 2. each connection is explicity defined and could have extra 
> attributes added in future (e.g. a disable attribute to disable 
> execution of that part of the pipeline.
> 3. The concept of input can probably be generalised to include the 
> "feed", allowing multiple feeds to be used (as discussed earlier in 
> this thread). e.g. stage1 would also have an input that would be the 
> feed.
>
Do you envision a stage with two inputs (aPort and bPort) waiting until 
there are inputs on both before its stageDriver invokes the process 
method? If stage5 needs two inputs, and stage2 provides 3 values and 
stage3 provides 2 values, there are just 2 complete pairs of values. The 
third value from stage2 could wait indefinitely for a matching input 
from stage3. Currently stages run until their queue is empty, but with 
multiple inputs that could be imbalanced, it might be better to set the 
quit condition to any one queue is empty and all upstream stages claim 
to be complete. Any non-empty queues on exit can trigger a warning.
>
>>
>> The Pipeline.java class will need to be modified to build and 
>> maintain a DAG structure rather than a linked list. The incoming data 
>> are managed by a queue in the stage driver, which would change to a 
>> group of queues, allowing multiple inputs (ports). I'm assuming there 
>> is an open source directed acyclic graph library out there that can 
>> replace the linked list.
>
> If defined as I propose I'm not sure a specific graph library is 
> necessary. The model just comprises a set of stages that know how they 
> are connected. e.g. the connections are already implicit in the model.
> But this probably needs more thought.
>
Currently the linked list of stages is used for lazy initialization, to 
find the next stage feeder the first time it is used. To allow general 
connections, the downstream feeder link could become an array of 
subsequent stageDrivers, with the connections set up as the pipeline is 
built. In that case, then a DAG library would not be needed, and we 
could keep the linked list as is.


-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org