You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@commons.apache.org by Ken Tanaka <Ke...@noaa.gov> on 2008/10/15 19:40:09 UTC

Re: [PIPELINE] Questions about pipeline

The Pipeline Basics tutorial has now been incorporated into the project 
page. Thanks to some help and cleanup from Rahul Akolkar the 
documentation submitted was installed quickly. See

http://commons.apache.org/sandbox/pipeline/pipeline_basics.html

-Ken

Aquator wrote:
> Hi,
>
> I am playing with pipeline, and I have some questions regarding to usage.
>
> What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data.
>
> Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem?
>
> My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour?
>
> Finally, I am interested about the various StageDrivers. I'd like some more detailed informations
> then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages.
>
> Thanks in advance for your time,
> Istvan Cseh 
>
> ______________________________________________________________________
> Olcsó repülőjegyet mindenkinek!
> Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
> repulojegy.budavartours.hu
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
> For additional commands, e-mail: user-help@commons.apache.org
>
>   


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.
Thanks for confirming the examples. I figure that fleshing this out now 
should speed development at a future time.

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> See comments below.
>>>
>>> Tim
...
>
>
>>> 2. each connection is explicity defined and could have extra 
>>> attributes added in future (e.g. a disable attribute to disable 
>>> execution of that part of the pipeline.
>>> 3. The concept of input can probably be generalised to include the 
>>> "feed", allowing multiple feeds to be used (as discussed earlier in 
>>> this thread). e.g. stage1 would also have an input that would be the 
>>> feed.
>>>
>> Do you envision a stage with two inputs (aPort and bPort) waiting 
>> until there are inputs on both before its stageDriver invokes the 
>> process method? If stage5 needs two inputs, and stage2 provides 3 
>> values and stage3 provides 2 values, there are just 2 complete pairs 
>> of values. The third value from stage2 could wait indefinitely for a 
>> matching input from stage3. Currently stages run until their queue is 
>> empty, but with multiple inputs that could be imbalanced, it might be 
>> better to set the quit condition to any one queue is empty and all 
>> upstream stages claim to be complete. Any non-empty queues on exit 
>> can trigger a warning.
>
> Yes, there are multiple scenarios here. e.g.
> 1. pairwise processing.
> a1 + b1
> a2 + b2
> a3 + b3
> ...
>
> 2. combinatorial processing
> a1 + b1
> a1 + b2
> a1 + b3
> a2 + b1
> a2 + b2
> ....
>
> (maybe others too)
>
>
> Clearly 1's inputs has to be matched, and if they are not some exit 
> rules would be required.
> Also, as you have mentioned before there are potential memory issues 
> with combinatorial processing of large sets of data, but I think 
> solutions can be found for this.
>
In general I prefer to stick to scenario 1. Any deviation can be handled 
by making a stage behave differently rather than putting this capability 
into the framework. A stage can be written to do nothing but accept 
pairwise arguments and emit combinatorial output, it can then be 
inserted before any stage to which you want to add combinatorial input 
processing.

Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
Ken Tanaka wrote:
> 
> 
> Tim Dudgeon wrote:
>> See comments below.
>>
>> Tim
>>
>> Ken Tanaka wrote:
>>>
>>>
>>> Tim Dudgeon wrote:
>>>> Ken Tanaka wrote:
>>>>> Hi Tim, 
>>> ...
>>>>>>
>>>>> At present, the structure for storing stages is a linked list, and 
>>>>> branches are implemented as additional pipelines accessed by a name 
>>>>> through a HashMap. To generally handle branching and merging, a 
>>>>> directed acyclic graph (DAG) would better serve, but that would 
>>>>> require the pipeline code to be rewritten at this level. Arguments 
>>>>> could also be made for allowing cycles, as in directed graphs, but 
>>>>> that would be harder to debug, and with a GUI might be a step 
>>>>> toward a visual programming language--so I don't think this should 
>>>>> be pursued yet unless there are volunteers...
>>>>>
>>>>
>>>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>>>> would be better too.
>>>> But, yes, I am ideally wanting visual designer too.
>>>>
>>> I'd like a visual designer too at some point, but that's a ways off 
>>> into the future.
>>>>
>>>>>>
>>>>>> Taken together I can see a generalisation here using named ports 
>>>>>> (input and outut), which is similar, but not identical, to your 
>>>>>> current concept of branches.
>>>>>>
>>>>>> So you have:
>>>>>> BaseStage.emit(String branch, Object obj);
>>>>>> whereas I would conceptually see this as:
>>>>>> emit(String port, Object obj);
>>>>>> and you have:
>>>>>> Stage.process(Object obj);
>>>>>> whereas I would would conceptually see this as:
>>>>>> Stage.process(String port, Object obj);
>>>>>>
>>>>>> And when a pipeline is being assembled a downstream stage is 
>>>>>> attached to a particular port of a stage, not the stage itself. It 
>>>>>> then just recieves data sent to that particular port, but not the 
>>>>>> other ports.
>>>>> I could see that this would work, but would need either modifying a 
>>>>> number of stages already written, or maybe creating a compatibility 
>>>>> stage driver that takes older style stages so that the input object 
>>>>> comes from a configured port name, usually "input" and a sends the 
>>>>> output to  configured output ports named "output" and whatever the 
>>>>> previous branch name(s) were, if any. Stages that used to look for 
>>>>> events for input should be rewritten to read multiple inputs ( 
>>>>> Stage.process(String port, Object obj) as you suggested). Events 
>>>>> would then be reserved for truly out-of-band signals between stages 
>>>>> rather than carrying data for processing.
>>>>
>>>> Agreed, I think with would be good. I think existing stages could be 
>>>> made compatible by having a default input and output port, and to 
>>>> use those if not specific port was specified.
>>>> A default in/out port would probably be necessary to allow simple 
>>>> auto-wiring.
>>>>
>>>>>>
>>>>>> I'd love to hear how compatible the current system is with this 
>>>>>> way of seeing things. Are we just talking about a new type of 
>>>>>> Stage implementation, or a more fundamental incompatibility at the 
>>>>>> API level.
>>>>>>
>>>>> I think you have some good ideas. This is changing the Stage 
>>>>> implementation, which affects on the order of 60 stages for us that 
>>>>> override the process method, unless the compatibility stage driver 
>>>>> works out. The top level pipeline would also be restructured. The 
>>>>> amount of work required puts this out of the near term for me to 
>>>>> work on it, but there may be other developers/contributors to take 
>>>>> this on.
>>>>
>>>> I need to investigate more fully here, and consider the other options.
>>>> But potentially this is certainly of interest.
>>>>
>>>> So is all that's necessary to prototype this to create a new Stage 
>>>> implementation, with new emit( ... ) and process( ... ) methods?
>>> I'm thinking it's more involved than that. To really deal well with 
>>> the arbitrary number of downstream stages rather than just one means 
>>> changing the digester rules 
>>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
>>> on specifying what follows. Normally a stage is connected to the 
>>> preceding stage if it is listed in that order in the configuration 
>>> file. This should be a default behavior, but if  stage2 and stage3 
>>> both follow stage1 then some notation of which is the previous stage 
>>> is needed.
>>>
>>> stage1----stage2
>>>    |
>>>    |-----stage3
>>>
>>> might be set up as conf_pipe.xml:
>>> <pipeline>
>>>   ...
>>>   <stage className="com.demo.pipeline.stages.Stage1" 
>>> driverFactoryId="df1" stageId="stage1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage2" 
>>> driverFactoryId="df1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage3" 
>>> driverFactoryId="df1" follows="stage1"/>
>>> </pipeline>
>>>
>>> I propose the 'follows="stage1"' attribute to connect stage3 to 
>>> stage1 instead of stage2 immediately preceding. This seems cleaner 
>>> than setting up a branch and matching up branch key names between the 
>>> branching stage and the secondary pipeline(s). Can you think of a 
>>> cleaner way to configure this?
>>
>> I think we're in danger of looking at this the wrong way. The XML 
>> should reflect the underlying data model, not drive it. But to stick 
>> with this paradigm I would think it might be best to explicity define 
>> the connections in the model definition. Maybe something more like this:
>>
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1"        
>> driverFactoryId="df1" stageId="stage1">
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage2"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
> Just to clarify, for Stage2, when you specify '<input stageId="stage1" 
> outputPort="pass"/>', 'outputPort="pass"' refers to an output port of 
> stage1 and named "pass" and is not specifying that the stage2 output 
> port is named "pass", right? So Stage1 has two output ports, named 
> "pass" and "fail", and this would be documented somewhere so you knew 
> what to connect to when you wrote the configuration XML?

Yes. That's the idea. Maybe different names might help here.


>>   <stage className="com.demo.pipeline.stages.Stage3"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage4"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>>   </stage>
> So here Stage4 has an input port named "aPort" and it is loaded from the 
> stage1 output port named "fail"?


Correct again.

>> </pipeline>
>>
>> I think this would allow more flexibility, as:
>> 1. a stage could define multiple inputs if it needed to.
> If I understand you correctly, suppose there is a stage5 that has input 
> ports "aPort" and "bPort" that we would like to receive data from stage2 
> and stage3 ("pass" output port from both). Then it would be specified as 
> follows:
> 
>  <stage className="com.demo.pipeline.stages.Stage5"
>    driverFactoryId="df1">
>    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
>    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
>  </stage>
> 

Correct again.

> I also assume that Stage2 and Stage3 are given stageIds of "stage2" and 
> "stage3" respectively.
> 
> [stage1]------------>[stage2]------------>[stage5]
>     |   pass->(in)           pass->aPort   ^
>     |                                      |
>     +-------------->[stage3]---------------+
>     |   pass->(in)           pass->bPort
>     |
>     +-------------->[stage4]
>         fail->aPort
> 

That's the idea. Its obviously a bit on an extreme case as it uses 
branching and merging, but it covers what I think is needed.


>> 2. each connection is explicity defined and could have extra 
>> attributes added in future (e.g. a disable attribute to disable 
>> execution of that part of the pipeline.
>> 3. The concept of input can probably be generalised to include the 
>> "feed", allowing multiple feeds to be used (as discussed earlier in 
>> this thread). e.g. stage1 would also have an input that would be the 
>> feed.
>>
> Do you envision a stage with two inputs (aPort and bPort) waiting until 
> there are inputs on both before its stageDriver invokes the process 
> method? If stage5 needs two inputs, and stage2 provides 3 values and 
> stage3 provides 2 values, there are just 2 complete pairs of values. The 
> third value from stage2 could wait indefinitely for a matching input 
> from stage3. Currently stages run until their queue is empty, but with 
> multiple inputs that could be imbalanced, it might be better to set the 
> quit condition to any one queue is empty and all upstream stages claim 
> to be complete. Any non-empty queues on exit can trigger a warning.

Yes, there are multiple scenarios here. e.g.
1. pairwise processing.
a1 + b1
a2 + b2
a3 + b3
...

2. combinatorial processing
a1 + b1
a1 + b2
a1 + b3
a2 + b1
a2 + b2
....

(maybe others too)


Clearly 1's inputs has to be matched, and if they are not some exit 
rules would be required.
Also, as you have mentioned before there are potential memory issues 
with combinatorial processing of large sets of data, but I think 
solutions can be found for this.


Tim


>>
>>>
>>> The Pipeline.java class will need to be modified to build and 
>>> maintain a DAG structure rather than a linked list. The incoming data 
>>> are managed by a queue in the stage driver, which would change to a 
>>> group of queues, allowing multiple inputs (ports). I'm assuming there 
>>> is an open source directed acyclic graph library out there that can 
>>> replace the linked list.
>>
>> If defined as I propose I'm not sure a specific graph library is 
>> necessary. The model just comprises a set of stages that know how they 
>> are connected. e.g. the connections are already implicit in the model.
>> But this probably needs more thought.
>>
> Currently the linked list of stages is used for lazy initialization, to 
> find the next stage feeder the first time it is used. To allow general 
> connections, the downstream feeder link could become an array of 
> subsequent stageDrivers, with the connections set up as the pipeline is 
> built. In that case, then a DAG library would not be needed, and we 
> could keep the linked list as is.
> 
> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.

Tim Dudgeon wrote:
> See comments below.
>
> Tim
>
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> Ken Tanaka wrote:
>>>> Hi Tim, 
>> ...
>>>>>
>>>> At present, the structure for storing stages is a linked list, and 
>>>> branches are implemented as additional pipelines accessed by a name 
>>>> through a HashMap. To generally handle branching and merging, a 
>>>> directed acyclic graph (DAG) would better serve, but that would 
>>>> require the pipeline code to be rewritten at this level. Arguments 
>>>> could also be made for allowing cycles, as in directed graphs, but 
>>>> that would be harder to debug, and with a GUI might be a step 
>>>> toward a visual programming language--so I don't think this should 
>>>> be pursued yet unless there are volunteers...
>>>>
>>>
>>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>>> would be better too.
>>> But, yes, I am ideally wanting visual designer too.
>>>
>> I'd like a visual designer too at some point, but that's a ways off 
>> into the future.
>>>
>>>>>
>>>>> Taken together I can see a generalisation here using named ports 
>>>>> (input and outut), which is similar, but not identical, to your 
>>>>> current concept of branches.
>>>>>
>>>>> So you have:
>>>>> BaseStage.emit(String branch, Object obj);
>>>>> whereas I would conceptually see this as:
>>>>> emit(String port, Object obj);
>>>>> and you have:
>>>>> Stage.process(Object obj);
>>>>> whereas I would would conceptually see this as:
>>>>> Stage.process(String port, Object obj);
>>>>>
>>>>> And when a pipeline is being assembled a downstream stage is 
>>>>> attached to a particular port of a stage, not the stage itself. It 
>>>>> then just recieves data sent to that particular port, but not the 
>>>>> other ports.
>>>> I could see that this would work, but would need either modifying a 
>>>> number of stages already written, or maybe creating a compatibility 
>>>> stage driver that takes older style stages so that the input object 
>>>> comes from a configured port name, usually "input" and a sends the 
>>>> output to  configured output ports named "output" and whatever the 
>>>> previous branch name(s) were, if any. Stages that used to look for 
>>>> events for input should be rewritten to read multiple inputs ( 
>>>> Stage.process(String port, Object obj) as you suggested). Events 
>>>> would then be reserved for truly out-of-band signals between stages 
>>>> rather than carrying data for processing.
>>>
>>> Agreed, I think with would be good. I think existing stages could be 
>>> made compatible by having a default input and output port, and to 
>>> use those if not specific port was specified.
>>> A default in/out port would probably be necessary to allow simple 
>>> auto-wiring.
>>>
>>>>>
>>>>> I'd love to hear how compatible the current system is with this 
>>>>> way of seeing things. Are we just talking about a new type of 
>>>>> Stage implementation, or a more fundamental incompatibility at the 
>>>>> API level.
>>>>>
>>>> I think you have some good ideas. This is changing the Stage 
>>>> implementation, which affects on the order of 60 stages for us that 
>>>> override the process method, unless the compatibility stage driver 
>>>> works out. The top level pipeline would also be restructured. The 
>>>> amount of work required puts this out of the near term for me to 
>>>> work on it, but there may be other developers/contributors to take 
>>>> this on.
>>>
>>> I need to investigate more fully here, and consider the other options.
>>> But potentially this is certainly of interest.
>>>
>>> So is all that's necessary to prototype this to create a new Stage 
>>> implementation, with new emit( ... ) and process( ... ) methods?
>> I'm thinking it's more involved than that. To really deal well with 
>> the arbitrary number of downstream stages rather than just one means 
>> changing the digester rules 
>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
>> on specifying what follows. Normally a stage is connected to the 
>> preceding stage if it is listed in that order in the configuration 
>> file. This should be a default behavior, but if  stage2 and stage3 
>> both follow stage1 then some notation of which is the previous stage 
>> is needed.
>>
>> stage1----stage2
>>    |
>>    |-----stage3
>>
>> might be set up as conf_pipe.xml:
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1" 
>> driverFactoryId="df1" stageId="stage1"/>
>>   <stage className="com.demo.pipeline.stages.Stage2" 
>> driverFactoryId="df1"/>
>>   <stage className="com.demo.pipeline.stages.Stage3" 
>> driverFactoryId="df1" follows="stage1"/>
>> </pipeline>
>>
>> I propose the 'follows="stage1"' attribute to connect stage3 to 
>> stage1 instead of stage2 immediately preceding. This seems cleaner 
>> than setting up a branch and matching up branch key names between the 
>> branching stage and the secondary pipeline(s). Can you think of a 
>> cleaner way to configure this?
>
> I think we're in danger of looking at this the wrong way. The XML 
> should reflect the underlying data model, not drive it. But to stick 
> with this paradigm I would think it might be best to explicity define 
> the connections in the model definition. Maybe something more like this:
>
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1"    
>     driverFactoryId="df1" stageId="stage1">
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage2"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
Just to clarify, for Stage2, when you specify '<input stageId="stage1" 
outputPort="pass"/>', 'outputPort="pass"' refers to an output port of 
stage1 and named "pass" and is not specifying that the stage2 output 
port is named "pass", right? So Stage1 has two output ports, named 
"pass" and "fail", and this would be documented somewhere so you knew 
what to connect to when you wrote the configuration XML?
>   <stage className="com.demo.pipeline.stages.Stage3"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage4"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>   </stage>
So here Stage4 has an input port named "aPort" and it is loaded from the 
stage1 output port named "fail"?
> </pipeline>
>
> I think this would allow more flexibility, as:
> 1. a stage could define multiple inputs if it needed to.
If I understand you correctly, suppose there is a stage5 that has input 
ports "aPort" and "bPort" that we would like to receive data from stage2 
and stage3 ("pass" output port from both). Then it would be specified as 
follows:

  <stage className="com.demo.pipeline.stages.Stage5"
    driverFactoryId="df1">
    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
  </stage>

I also assume that Stage2 and Stage3 are given stageIds of "stage2" and 
"stage3" respectively.

[stage1]------------>[stage2]------------>[stage5]
     |   pass->(in)           pass->aPort   ^
     |                                      |
     +-------------->[stage3]---------------+
     |   pass->(in)           pass->bPort
     |
     +-------------->[stage4]
         fail->aPort

> 2. each connection is explicity defined and could have extra 
> attributes added in future (e.g. a disable attribute to disable 
> execution of that part of the pipeline.
> 3. The concept of input can probably be generalised to include the 
> "feed", allowing multiple feeds to be used (as discussed earlier in 
> this thread). e.g. stage1 would also have an input that would be the 
> feed.
>
Do you envision a stage with two inputs (aPort and bPort) waiting until 
there are inputs on both before its stageDriver invokes the process 
method? If stage5 needs two inputs, and stage2 provides 3 values and 
stage3 provides 2 values, there are just 2 complete pairs of values. The 
third value from stage2 could wait indefinitely for a matching input 
from stage3. Currently stages run until their queue is empty, but with 
multiple inputs that could be imbalanced, it might be better to set the 
quit condition to any one queue is empty and all upstream stages claim 
to be complete. Any non-empty queues on exit can trigger a warning.
>
>>
>> The Pipeline.java class will need to be modified to build and 
>> maintain a DAG structure rather than a linked list. The incoming data 
>> are managed by a queue in the stage driver, which would change to a 
>> group of queues, allowing multiple inputs (ports). I'm assuming there 
>> is an open source directed acyclic graph library out there that can 
>> replace the linked list.
>
> If defined as I propose I'm not sure a specific graph library is 
> necessary. The model just comprises a set of stages that know how they 
> are connected. e.g. the connections are already implicit in the model.
> But this probably needs more thought.
>
Currently the linked list of stages is used for lazy initialization, to 
find the next stage feeder the first time it is used. To allow general 
connections, the downstream feeder link could become an array of 
subsequent stageDrivers, with the connections set up as the pipeline is 
built. In that case, then a DAG library would not be needed, and we 
could keep the linked list as is.


-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
See comments below.

Tim

Ken Tanaka wrote:
> 
> 
> Tim Dudgeon wrote:
>> Ken Tanaka wrote:
>>> Hi Tim, 
> ...
>>>>
>>> At present, the structure for storing stages is a linked list, and 
>>> branches are implemented as additional pipelines accessed by a name 
>>> through a HashMap. To generally handle branching and merging, a 
>>> directed acyclic graph (DAG) would better serve, but that would 
>>> require the pipeline code to be rewritten at this level. Arguments 
>>> could also be made for allowing cycles, as in directed graphs, but 
>>> that would be harder to debug, and with a GUI might be a step toward 
>>> a visual programming language--so I don't think this should be 
>>> pursued yet unless there are volunteers...
>>>
>>
>> I agree, DAG would be better, but cycles could be needeed too, so DG 
>> would be better too.
>> But, yes, I am ideally wanting visual designer too.
>>
> I'd like a visual designer too at some point, but that's a ways off into 
> the future.
>>
>>>>
>>>> Taken together I can see a generalisation here using named ports 
>>>> (input and outut), which is similar, but not identical, to your 
>>>> current concept of branches.
>>>>
>>>> So you have:
>>>> BaseStage.emit(String branch, Object obj);
>>>> whereas I would conceptually see this as:
>>>> emit(String port, Object obj);
>>>> and you have:
>>>> Stage.process(Object obj);
>>>> whereas I would would conceptually see this as:
>>>> Stage.process(String port, Object obj);
>>>>
>>>> And when a pipeline is being assembled a downstream stage is 
>>>> attached to a particular port of a stage, not the stage itself. It 
>>>> then just recieves data sent to that particular port, but not the 
>>>> other ports.
>>> I could see that this would work, but would need either modifying a 
>>> number of stages already written, or maybe creating a compatibility 
>>> stage driver that takes older style stages so that the input object 
>>> comes from a configured port name, usually "input" and a sends the 
>>> output to  configured output ports named "output" and whatever the 
>>> previous branch name(s) were, if any. Stages that used to look for 
>>> events for input should be rewritten to read multiple inputs ( 
>>> Stage.process(String port, Object obj) as you suggested). Events 
>>> would then be reserved for truly out-of-band signals between stages 
>>> rather than carrying data for processing.
>>
>> Agreed, I think with would be good. I think existing stages could be 
>> made compatible by having a default input and output port, and to use 
>> those if not specific port was specified.
>> A default in/out port would probably be necessary to allow simple 
>> auto-wiring.
>>
>>>>
>>>> I'd love to hear how compatible the current system is with this way 
>>>> of seeing things. Are we just talking about a new type of Stage 
>>>> implementation, or a more fundamental incompatibility at the API level.
>>>>
>>> I think you have some good ideas. This is changing the Stage 
>>> implementation, which affects on the order of 60 stages for us that 
>>> override the process method, unless the compatibility stage driver 
>>> works out. The top level pipeline would also be restructured. The 
>>> amount of work required puts this out of the near term for me to work 
>>> on it, but there may be other developers/contributors to take this on.
>>
>> I need to investigate more fully here, and consider the other options.
>> But potentially this is certainly of interest.
>>
>> So is all that's necessary to prototype this to create a new Stage 
>> implementation, with new emit( ... ) and process( ... ) methods?
> I'm thinking it's more involved than that. To really deal well with the 
> arbitrary number of downstream stages rather than just one means 
> changing the digester rules 
> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
> on specifying what follows. Normally a stage is connected to the 
> preceding stage if it is listed in that order in the configuration file. 
> This should be a default behavior, but if  stage2 and stage3 both follow 
> stage1 then some notation of which is the previous stage is needed.
> 
> stage1----stage2
>    |
>    |-----stage3
> 
> might be set up as conf_pipe.xml:
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1" 
> driverFactoryId="df1" stageId="stage1"/>
>   <stage className="com.demo.pipeline.stages.Stage2" 
> driverFactoryId="df1"/>
>   <stage className="com.demo.pipeline.stages.Stage3" 
> driverFactoryId="df1" follows="stage1"/>
> </pipeline>
> 
> I propose the 'follows="stage1"' attribute to connect stage3 to stage1 
> instead of stage2 immediately preceding. This seems cleaner than setting 
> up a branch and matching up branch key names between the branching stage 
> and the secondary pipeline(s). Can you think of a cleaner way to 
> configure this?

I think we're in danger of looking at this the wrong way. The XML should 
reflect the underlying data model, not drive it. But to stick with this 
paradigm I would think it might be best to explicity define the 
connections in the model definition. Maybe something more like this:

<pipeline>
   ...
   <stage className="com.demo.pipeline.stages.Stage1" 	
	driverFactoryId="df1" stageId="stage1">
   </stage>
   <stage className="com.demo.pipeline.stages.Stage2"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage3"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage4"
	driverFactoryId="df1">
	<input stageId="stage1" outputPort="fail" inputPort="aPort"/>
   </stage>
</pipeline>

I think this would allow more flexibility, as:
1. a stage could define multiple inputs if it needed to.
2. each connection is explicity defined and could have extra attributes 
added in future (e.g. a disable attribute to disable execution of that 
part of the pipeline.
3. The concept of input can probably be generalised to include the 
"feed", allowing multiple feeds to be used (as discussed earlier in this 
thread). e.g. stage1 would also have an input that would be the feed.


> 
> The Pipeline.java class will need to be modified to build and maintain a 
> DAG structure rather than a linked list. The incoming data are managed 
> by a queue in the stage driver, which would change to a group of queues, 
> allowing multiple inputs (ports). I'm assuming there is an open source 
> directed acyclic graph library out there that can replace the linked list.

If defined as I propose I'm not sure a specific graph library is 
necessary. The model just comprises a set of stages that know how they 
are connected. e.g. the connections are already implicit in the model.
But this probably needs more thought.



> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>> Hi Tim, 
...
>>>
>> At present, the structure for storing stages is a linked list, and 
>> branches are implemented as additional pipelines accessed by a name 
>> through a HashMap. To generally handle branching and merging, a 
>> directed acyclic graph (DAG) would better serve, but that would 
>> require the pipeline code to be rewritten at this level. Arguments 
>> could also be made for allowing cycles, as in directed graphs, but 
>> that would be harder to debug, and with a GUI might be a step toward 
>> a visual programming language--so I don't think this should be 
>> pursued yet unless there are volunteers...
>>
>
> I agree, DAG would be better, but cycles could be needeed too, so DG 
> would be better too.
> But, yes, I am ideally wanting visual designer too.
>
I'd like a visual designer too at some point, but that's a ways off into 
the future.
>
>>>
>>> Taken together I can see a generalisation here using named ports 
>>> (input and outut), which is similar, but not identical, to your 
>>> current concept of branches.
>>>
>>> So you have:
>>> BaseStage.emit(String branch, Object obj);
>>> whereas I would conceptually see this as:
>>> emit(String port, Object obj);
>>> and you have:
>>> Stage.process(Object obj);
>>> whereas I would would conceptually see this as:
>>> Stage.process(String port, Object obj);
>>>
>>> And when a pipeline is being assembled a downstream stage is 
>>> attached to a particular port of a stage, not the stage itself. It 
>>> then just recieves data sent to that particular port, but not the 
>>> other ports.
>> I could see that this would work, but would need either modifying a 
>> number of stages already written, or maybe creating a compatibility 
>> stage driver that takes older style stages so that the input object 
>> comes from a configured port name, usually "input" and a sends the 
>> output to  configured output ports named "output" and whatever the 
>> previous branch name(s) were, if any. Stages that used to look for 
>> events for input should be rewritten to read multiple inputs ( 
>> Stage.process(String port, Object obj) as you suggested). Events 
>> would then be reserved for truly out-of-band signals between stages 
>> rather than carrying data for processing.
>
> Agreed, I think with would be good. I think existing stages could be 
> made compatible by having a default input and output port, and to use 
> those if not specific port was specified.
> A default in/out port would probably be necessary to allow simple 
> auto-wiring.
>
>>>
>>> I'd love to hear how compatible the current system is with this way 
>>> of seeing things. Are we just talking about a new type of Stage 
>>> implementation, or a more fundamental incompatibility at the API level.
>>>
>> I think you have some good ideas. This is changing the Stage 
>> implementation, which affects on the order of 60 stages for us that 
>> override the process method, unless the compatibility stage driver 
>> works out. The top level pipeline would also be restructured. The 
>> amount of work required puts this out of the near term for me to work 
>> on it, but there may be other developers/contributors to take this on.
>
> I need to investigate more fully here, and consider the other options.
> But potentially this is certainly of interest.
>
> So is all that's necessary to prototype this to create a new Stage 
> implementation, with new emit( ... ) and process( ... ) methods?
I'm thinking it's more involved than that. To really deal well with the 
arbitrary number of downstream stages rather than just one means 
changing the digester rules 
<http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> 
on specifying what follows. Normally a stage is connected to the 
preceding stage if it is listed in that order in the configuration file. 
This should be a default behavior, but if  stage2 and stage3 both follow 
stage1 then some notation of which is the previous stage is needed.

stage1----stage2
    |
    |-----stage3

might be set up as conf_pipe.xml:
<pipeline>
   ...
   <stage className="com.demo.pipeline.stages.Stage1" 
driverFactoryId="df1" stageId="stage1"/>
   <stage className="com.demo.pipeline.stages.Stage2" 
driverFactoryId="df1"/>
   <stage className="com.demo.pipeline.stages.Stage3" 
driverFactoryId="df1" follows="stage1"/>
</pipeline>

I propose the 'follows="stage1"' attribute to connect stage3 to stage1 
instead of stage2 immediately preceding. This seems cleaner than setting 
up a branch and matching up branch key names between the branching stage 
and the secondary pipeline(s). Can you think of a cleaner way to 
configure this?

The Pipeline.java class will need to be modified to build and maintain a 
DAG structure rather than a linked list. The incoming data are managed 
by a queue in the stage driver, which would change to a group of queues, 
allowing multiple inputs (ports). I'm assuming there is an open source 
directed acyclic graph library out there that can replace the linked list.

-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
Ken Tanaka wrote:
> Hi Tim,
> 
> Tim Dudgeon wrote:
>> Hi Ken,
>>
>> Thanks for the rapid response.
>> First, let me explain some background here.
>> I am looking for Java based pipelining solutions to incorporate into 
>> an exisiting application. The use of pipelining is well established in 
>> the  sector, with applications like Pipeline Pilot and Knime, and so 
>> many of the common needs have been well established over several years 
>> by these applciations.
> Have you also looked at Pentaho?
I took a look, but it doesn't seem to be what I'm after.

>>
>> Key issues that my initial investigations of Jakarta Pipeline seem to 
>> identify are:
>>
>> 1. Branching is very common. This typically takes 2 forms:
>> 1.1. Splitting data. A stage could (for instance) have 2 output ports, 
>> "pass" and "fail". Data is processed by the stage and sent to 
>> whichever port is appropriate. Different stages would be attached to 
>> each port, resulting in the pipeline being brached by this pass/fail 
>> decision.
>> 1.2. Attaching multiple stages to a particular output port.
>> The stage just sends its output onwards. It has no interest in what 
>> happens once the data is sent, and is not concerned whether zero, one 
>> or  100 stages receive the output. This is the stage1,2,3,4 scenario I 
>> outlined previously.
>>
>> 2. Merging is also common (though less common than branching).
>> By analogy with braching, I would see this conceptually as a stage 
>> having multiple input ports (A and B in the merging example).
>>
> At present, the structure for storing stages is a linked list, and 
> branches are implemented as additional pipelines accessed by a name 
> through a HashMap. To generally handle branching and merging, a directed 
> acyclic graph (DAG) would better serve, but that would require the 
> pipeline code to be rewritten at this level. Arguments could also be 
> made for allowing cycles, as in directed graphs, but that would be 
> harder to debug, and with a GUI might be a step toward a visual 
> programming language--so I don't think this should be pursued yet unless 
> there are volunteers...
>

I agree, DAG would be better, but cycles could be needeed too, so DG 
would be better too.
But, yes, I am ideally wanting visual designer too.


>>
>> Taken together I can see a generalisation here using named ports 
>> (input and outut), which is similar, but not identical, to your 
>> current concept of branches.
>>
>> So you have:
>> BaseStage.emit(String branch, Object obj);
>> whereas I would conceptually see this as:
>> emit(String port, Object obj);
>> and you have:
>> Stage.process(Object obj);
>> whereas I would would conceptually see this as:
>> Stage.process(String port, Object obj);
>>
>> And when a pipeline is being assembled a downstream stage is attached 
>> to a particular port of a stage, not the stage itself. It then just 
>> recieves data sent to that particular port, but not the other ports.
> I could see that this would work, but would need either modifying a 
> number of stages already written, or maybe creating a compatibility 
> stage driver that takes older style stages so that the input object 
> comes from a configured port name, usually "input" and a sends the 
> output to  configured output ports named "output" and whatever the 
> previous branch name(s) were, if any. Stages that used to look for 
> events for input should be rewritten to read multiple inputs ( 
> Stage.process(String port, Object obj) as you suggested). Events would 
> then be reserved for truly out-of-band signals between stages rather 
> than carrying data for processing.

Agreed, I think with would be good. I think existing stages could be 
made compatible by having a default input and output port, and to use 
those if not specific port was specified.
A default in/out port would probably be necessary to allow simple 
auto-wiring.

>>
>> I'd love to hear how compatible the current system is with this way of 
>> seeing things. Are we just talking about a new type of Stage 
>> implementation, or a more fundamental incompatibility at the API level.
>>
> I think you have some good ideas. This is changing the Stage 
> implementation, which affects on the order of 60 stages for us that 
> override the process method, unless the compatibility stage driver works 
> out. The top level pipeline would also be restructured. The amount of 
> work required puts this out of the near term for me to work on it, but 
> there may be other developers/contributors to take this on.

I need to investigate more fully here, and consider the other options.
But potentially this is certainly of interest.

So is all that's necessary to prototype this to create a new Stage 
implementation, with new emit( ... ) and process( ... ) methods?


Thanks

Tim




> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.
Hi Tim,

Tim Dudgeon wrote:
> Hi Ken,
>
> Thanks for the rapid response.
> First, let me explain some background here.
> I am looking for Java based pipelining solutions to incorporate into 
> an exisiting application. The use of pipelining is well established in 
> the  sector, with applications like Pipeline Pilot and Knime, and so 
> many of the common needs have been well established over several years 
> by these applciations.
Have you also looked at Pentaho?
>
> Key issues that my initial investigations of Jakarta Pipeline seem to 
> identify are:
>
> 1. Branching is very common. This typically takes 2 forms:
> 1.1. Splitting data. A stage could (for instance) have 2 output ports, 
> "pass" and "fail". Data is processed by the stage and sent to 
> whichever port is appropriate. Different stages would be attached to 
> each port, resulting in the pipeline being brached by this pass/fail 
> decision.
> 1.2. Attaching multiple stages to a particular output port.
> The stage just sends its output onwards. It has no interest in what 
> happens once the data is sent, and is not concerned whether zero, one 
> or  100 stages receive the output. This is the stage1,2,3,4 scenario I 
> outlined previously.
>
> 2. Merging is also common (though less common than branching).
> By analogy with braching, I would see this conceptually as a stage 
> having multiple input ports (A and B in the merging example).
>
At present, the structure for storing stages is a linked list, and 
branches are implemented as additional pipelines accessed by a name 
through a HashMap. To generally handle branching and merging, a directed 
acyclic graph (DAG) would better serve, but that would require the 
pipeline code to be rewritten at this level. Arguments could also be 
made for allowing cycles, as in directed graphs, but that would be 
harder to debug, and with a GUI might be a step toward a visual 
programming language--so I don't think this should be pursued yet unless 
there are volunteers...

>
> Taken together I can see a generalisation here using named ports 
> (input and outut), which is similar, but not identical, to your 
> current concept of branches.
>
> So you have:
> BaseStage.emit(String branch, Object obj);
> whereas I would conceptually see this as:
> emit(String port, Object obj);
> and you have:
> Stage.process(Object obj);
> whereas I would would conceptually see this as:
> Stage.process(String port, Object obj);
>
> And when a pipeline is being assembled a downstream stage is attached 
> to a particular port of a stage, not the stage itself. It then just 
> recieves data sent to that particular port, but not the other ports.
I could see that this would work, but would need either modifying a 
number of stages already written, or maybe creating a compatibility 
stage driver that takes older style stages so that the input object 
comes from a configured port name, usually "input" and a sends the 
output to  configured output ports named "output" and whatever the 
previous branch name(s) were, if any. Stages that used to look for 
events for input should be rewritten to read multiple inputs ( 
Stage.process(String port, Object obj) as you suggested). Events would 
then be reserved for truly out-of-band signals between stages rather 
than carrying data for processing.
>
> I'd love to hear how compatible the current system is with this way of 
> seeing things. Are we just talking about a new type of Stage 
> implementation, or a more fundamental incompatibility at the API level.
>
I think you have some good ideas. This is changing the Stage 
implementation, which affects on the order of 60 stages for us that 
override the process method, unless the compatibility stage driver works 
out. The top level pipeline would also be restructured. The amount of 
work required puts this out of the near term for me to work on it, but 
there may be other developers/contributors to take this on.

-Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
Hi Ken,

Thanks for the rapid response.
First, let me explain some background here.
I am looking for Java based pipelining solutions to incorporate into an 
exisiting application. The use of pipelining is well established in the 
  sector, with applications like Pipeline Pilot and Knime, and so many 
of the common needs have been well established over several years by 
these applciations.

Key issues that my initial investigations of Jakarta Pipeline seem to 
identify are:

1. Branching is very common. This typically takes 2 forms:
1.1. Splitting data. A stage could (for instance) have 2 output ports, 
"pass" and "fail". Data is processed by the stage and sent to whichever 
port is appropriate. Different stages would be attached to each port, 
resulting in the pipeline being brached by this pass/fail decision.
1.2. Attaching multiple stages to a particular output port.
The stage just sends its output onwards. It has no interest in what 
happens once the data is sent, and is not concerned whether zero, one or 
  100 stages receive the output. This is the stage1,2,3,4 scenario I 
outlined previously.

2. Merging is also common (though less common than branching).
By analogy with braching, I would see this conceptually as a stage 
having multiple input ports (A and B in the merging example).


Taken together I can see a generalisation here using named ports (input 
and outut), which is similar, but not identical, to your current concept 
of branches.

So you have:
BaseStage.emit(String branch, Object obj);
whereas I would conceptually see this as:
emit(String port, Object obj);
and you have:
Stage.process(Object obj);
whereas I would would conceptually see this as:
Stage.process(String port, Object obj);

And when a pipeline is being assembled a downstream stage is attached to 
a particular port of a stage, not the stage itself. It then just 
recieves data sent to that particular port, but not the other ports.

I'd love to hear how compatible the current system is with this way of 
seeing things. Are we just talking about a new type of Stage 
implementation, or a more fundamental incompatibility at the API level.


Many thanks.

Tim



Ken Tanaka wrote:
> 
> 
> Tim Dudgeon wrote:
>> Ken Tanaka wrote:
>>> The Pipeline Basics tutorial has now been incorporated into the 
>>> project page. Thanks to some help and cleanup from Rahul Akolkar the 
>>> documentation submitted was installed quickly. See
>>>
>>> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
>>>
>>> -Ken
>>>
>>
>> That documentation is really useful. Thanks!
>>
> Wow, someone is actually looking at this. I'll work on cleaning up the 
> documentation some. I hope people realize that some of the color-coded 
> examples got some inadvertent newlines added--but this isn't relevant to 
> your questions.
>> Could I follow up one of the earlier questions in this thread on 
>> branching and merging.
>>
>>
>> From those docs it looks to me like the way data was set to a branch 
>> is a bit strange. There appears to be a FileReaderStage class that has 
>> Java bean property called htmlPipelineKey:
>> <stage className="com.demo.pipeline.stages.FileReaderStage"
>> driverFactoryId="df1" htmlPipelineKey="sales2html"/>
>>
>> and later in the pipeline a branch is defined that names the pipeline 
>> according to that name:
>> <pipeline key="sales2html">
>>
>> This seems pretty inflexible to me. Any branches have to be hardcoded 
>> into the stage definition. I was expecting a situation where multiple 
>> stages could be the recipients of the output of any stage, and these 
>> can be "wired up" dynamically. e.g. something like this:
>>
>>
>>          |--stage2
>>          |
>> stage1---+--stage3
>>          |
>>          |--stage4
>>
>> so that all you needed to do was to define a stage5 as one more 
>> downstream stage for stage 1 and it would transparently receive the data.
>>
>> Is this possible, or does the branching have to be hard-coded into the 
>> stage definition?
> I wouldn't call the way branches are specified "hard coding", since the 
> xml file here is a configuration file. For our current use, branches are 
> pretty rare, so the pipeline framework deals best with simple cases that 
> are fairly linear. Also, if stage1 is a branching stage, then that stage 
> was written with branching in mind, and the "htmlPipelineKey" is a 
> hard-coded property name in the stage source code, so it can direct 
> output when it passes data out to the framework. To simplify matters, 
> all your branching stages could follow a convention of using "branchKey" 
> (or some other generic name), then you wouldn't have to remember what 
> variable holds the branch name for which stage.
> 
> A stage could be written to take an arbitrary number of branch names, 
> and thus send output down multiple branches, although it can get 
> complicated configuring rules on what goes where if the same thing isn't 
> going to all the branches. So rather than making stage1 a branching 
> stage, it could be followed by "stageMulti", which would send copies of 
> it's input to a number of outputs:
> 
>                  |-----stage2
>                  |
> stage1----stageMulti----stage3
>                  |
>                  |-----stage4
> 
> stageMulti could then be used to add branching to any stage it follows.
> 
> I can imagine making configuration files a little simpler with regards 
> to setting up branching, but the more intelligent configuration file 
> reader to handle that hasn't been written.
>>
>>
>> Similarly for merging. To follow up the previous question, let say I 
>> had stageA that output some A's and stage B that output some B's (lets 
>> assume both A's and B's are simple numbers). Now I wanted to have a 
>> stageC that takes all A's and all B's and generates some output with 
>> the, (lets assume the output is A * B so that every combination of A * 
>> B is output). So this would look like this:
>>
>> stageA--+
>>         |
>>         |----stageC
>>         |
>> stageB--+
>>
>> Is it possble to do this, so that stageA and stageB are both writing 
>> to stageC, but that stageC can distinguish the 2 different streams of 
>> data?
>>
>>
> First off, the current design expects all pipelines to start with one 
> stage, to accept feed values out of the config file (or place command 
> line arguments into the first stage queue if the main pipeline 
> application was been written to do that). So maybe you have a stageInit 
> which takes a single number like "3"
> 
> feed "3" --> stageInit----stageA
>                |
>                ----------stageB
> 
> stageInit can then pass "3" on to stageA and stageB, possibly causing 
> stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers.
> 
> For merging, stageC will accept normal input from a stage as well as 
> watch for events carrying additional data. stageC may well have to 
> accumulate input and then produce output as events are received. Stages 
> normally accept one input, which is either a feed or the output of the 
> stage immediately preceding them. Input from elsewhere or from more than 
> one source is currently handled as events raised by the source and 
> received by a "notify" method in the receiving stage.
> 
> feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222, 
> 10*333, 20*111, 20*222, 20*333, 30*111....
>                |      3          10, 20, 30    :
>                ----------stageB................:
>                       3          111, 222, 333
> ---- normal data flow
> .... event passed data
> 
> Like branching, for our uses merging is rare. Also beware of running out 
> of memory if you are doing any accumulation of data to merge input from 
> more than one stage.
> 
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Ken Tanaka <Ke...@noaa.gov>.

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>> The Pipeline Basics tutorial has now been incorporated into the 
>> project page. Thanks to some help and cleanup from Rahul Akolkar the 
>> documentation submitted was installed quickly. See
>>
>> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
>>
>> -Ken
>>
>
> That documentation is really useful. Thanks!
>
Wow, someone is actually looking at this. I'll work on cleaning up the 
documentation some. I hope people realize that some of the color-coded 
examples got some inadvertent newlines added--but this isn't relevant to 
your questions.
> Could I follow up one of the earlier questions in this thread on 
> branching and merging.
>
>
> From those docs it looks to me like the way data was set to a branch 
> is a bit strange. There appears to be a FileReaderStage class that has 
> Java bean property called htmlPipelineKey:
> <stage className="com.demo.pipeline.stages.FileReaderStage"
> driverFactoryId="df1" htmlPipelineKey="sales2html"/>
>
> and later in the pipeline a branch is defined that names the pipeline 
> according to that name:
> <pipeline key="sales2html">
>
> This seems pretty inflexible to me. Any branches have to be hardcoded 
> into the stage definition. I was expecting a situation where multiple 
> stages could be the recipients of the output of any stage, and these 
> can be "wired up" dynamically. e.g. something like this:
>
>
>          |--stage2
>          |
> stage1---+--stage3
>          |
>          |--stage4
>
> so that all you needed to do was to define a stage5 as one more 
> downstream stage for stage 1 and it would transparently receive the data.
>
> Is this possible, or does the branching have to be hard-coded into the 
> stage definition?
I wouldn't call the way branches are specified "hard coding", since the 
xml file here is a configuration file. For our current use, branches are 
pretty rare, so the pipeline framework deals best with simple cases that 
are fairly linear. Also, if stage1 is a branching stage, then that stage 
was written with branching in mind, and the "htmlPipelineKey" is a 
hard-coded property name in the stage source code, so it can direct 
output when it passes data out to the framework. To simplify matters, 
all your branching stages could follow a convention of using "branchKey" 
(or some other generic name), then you wouldn't have to remember what 
variable holds the branch name for which stage.

A stage could be written to take an arbitrary number of branch names, 
and thus send output down multiple branches, although it can get 
complicated configuring rules on what goes where if the same thing isn't 
going to all the branches. So rather than making stage1 a branching 
stage, it could be followed by "stageMulti", which would send copies of 
it's input to a number of outputs:

                  |-----stage2
                  |
stage1----stageMulti----stage3
                  |
                  |-----stage4

stageMulti could then be used to add branching to any stage it follows.

I can imagine making configuration files a little simpler with regards 
to setting up branching, but the more intelligent configuration file 
reader to handle that hasn't been written.
>
>
> Similarly for merging. To follow up the previous question, let say I 
> had stageA that output some A's and stage B that output some B's (lets 
> assume both A's and B's are simple numbers). Now I wanted to have a 
> stageC that takes all A's and all B's and generates some output with 
> the, (lets assume the output is A * B so that every combination of A * 
> B is output). So this would look like this:
>
> stageA--+
>         |
>         |----stageC
>         |
> stageB--+
>
> Is it possble to do this, so that stageA and stageB are both writing 
> to stageC, but that stageC can distinguish the 2 different streams of 
> data?
>
>
First off, the current design expects all pipelines to start with one 
stage, to accept feed values out of the config file (or place command 
line arguments into the first stage queue if the main pipeline 
application was been written to do that). So maybe you have a stageInit 
which takes a single number like "3"

feed "3" --> stageInit----stageA
                |
                ----------stageB

stageInit can then pass "3" on to stageA and stageB, possibly causing 
stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers.

For merging, stageC will accept normal input from a stage as well as 
watch for events carrying additional data. stageC may well have to 
accumulate input and then produce output as events are received. Stages 
normally accept one input, which is either a feed or the output of the 
stage immediately preceding them. Input from elsewhere or from more than 
one source is currently handled as events raised by the source and 
received by a "notify" method in the receiving stage.

feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222, 
10*333, 20*111, 20*222, 20*333, 30*111....
                |      3          10, 20, 30    :
                ----------stageB................:
                       3          111, 222, 333
---- normal data flow
.... event passed data

Like branching, for our uses merging is rare. Also beware of running out 
of memory if you are doing any accumulation of data to merge input from 
more than one stage.

-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Re: [PIPELINE] Questions about pipeline

Posted by Tim Dudgeon <td...@informaticsmatters.com>.
Ken Tanaka wrote:
> The Pipeline Basics tutorial has now been incorporated into the project 
> page. Thanks to some help and cleanup from Rahul Akolkar the 
> documentation submitted was installed quickly. See
> 
> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
> 
> -Ken
> 

That documentation is really useful. Thanks!

Could I follow up one of the earlier questions in this thread on 
branching and merging.


 From those docs it looks to me like the way data was set to a branch is 
a bit strange. There appears to be a FileReaderStage class that has Java 
bean property called htmlPipelineKey:
<stage className="com.demo.pipeline.stages.FileReaderStage"
driverFactoryId="df1" htmlPipelineKey="sales2html"/>

and later in the pipeline a branch is defined that names the pipeline 
according to that name:
<pipeline key="sales2html">

This seems pretty inflexible to me. Any branches have to be hardcoded 
into the stage definition. I was expecting a situation where multiple 
stages could be the recipients of the output of any stage, and these can 
be "wired up" dynamically. e.g. something like this:


          |--stage2
          |
stage1---+--stage3
          |
          |--stage4

so that all you needed to do was to define a stage5 as one more 
downstream stage for stage 1 and it would transparently receive the data.

Is this possible, or does the branching have to be hard-coded into the 
stage definition?


Similarly for merging. To follow up the previous question, let say I had 
stageA that output some A's and stage B that output some B's (lets 
assume both A's and B's are simple numbers). Now I wanted to have a 
stageC that takes all A's and all B's and generates some output with 
the, (lets assume the output is A * B so that every combination of A * B 
is output). So this would look like this:

stageA--+
         |
         |----stageC
         |
stageB--+

Is it possble to do this, so that stageA and stageB are both writing to 
stageC, but that stageC can distinguish the 2 different streams of data?


Many thanks.

Tim






---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org