You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "J.R. Pauley" <jr...@gmail.com> on 2017/08/03 15:32:10 UTC

stream .each confusion

I'm scratching my head trying to produce a simple drpc stream that does
what I want. Logically I think I want this:

     tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new Fn1(), new Fields("B"))
        .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))
        .each(new Fields("args","C"), new Fn3(), new Fields("D"));

However I get a runtime exception for duplicated args, so I renamed the
args to give them all unique names, like so:

      tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))
        .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))
        .each(new Fields("args3","C"), new Fn3(), new Fields("D"));

This works (sort of). The problem is I end up with output stream of lots of
duplicate args. I don't want the original "args" emitted in the output
stream at all. But if I emit D I still see args2,args3,D in the output.

All I am trying to do is
1)make args available to all 3 named functions, and
2)supply additional arg B as input to Fn2, and
3)supply additional arg C as input to Fn3 which outputs D and D only as
result

I've read that I don't need to define new "args" as they are passed to all
functions. However if I try to access tuple.getString(1) in Fn2 I get an
ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.

So I'm really confused as to how best to define this topology. Any help
appreciated.

Re: stream .each confusion

Posted by Bobby Evans <ev...@yahoo-inc.com>.
Did you see my updated topology.  I only removed"args" from the output fields of Fn2().  I left them as the input fields to Fn2 and Fn3


- Bobby


On Thursday, August 3, 2017, 10:55:04 AM CDT, J.R. Pauley <jr...@gmail.com> wrote:

Bobby,
sorry if I am misreading you, but if I don't explicitly declare "args" then Fn2 execute has only the single parameter "B" tuple.getString(0) as its input. I am not able to call tuple.getString(1) and tuple.size() =1, not 2. So below can't be right. How do I access "args" in Fn2?
tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new Fn1(), new Fields("B"))        .each(new Fields("B"), new Fn2(), new Fields("C"))        .each(new Fields("C"), new Fn3(), new Fields("D"));  
On Thu, Aug 3, 2017 at 11:36 AM, Bobby Evans <ev...@yahoo-inc.com> wrote:

With trident the original args going into the each never go away, so you don't have to output them.
     tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new Fn1(), new Fields("B"))        .each(new Fields("args","B"), new Fn2(), new Fields("C"))        .each(new Fields("args","C"), new Fn3(), new Fields("D"));     
By removing the "args" being output from Fn2() it should work for you.  I know it is a bit odd but that is how it works.

- Bobby


On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <jr...@gmail.com> wrote:

I'm scratching my head trying to produce a simple drpc stream that does what I want. Logically I think I want this:
     tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new Fn1(), new Fields("B"))        .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))        .each(new Fields("args","C"), new Fn3(), new Fields("D"));        
However I get a runtime exception for duplicated args, so I renamed the args to give them all unique names, like so:
      tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))        .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))        .each(new Fields("args3","C"), new Fn3(), new Fields("D"));        
This works (sort of). The problem is I end up with output stream of lots of duplicate args. I don't want the original "args" emitted in the output stream at all. But if I emit D I still see args2,args3,D in the output.
All I am trying to do is1)make args available to all 3 named functions, and2)supply additional arg B as input to Fn2, and3)supply additional arg C as input to Fn3 which outputs D and D only as result
I've read that I don't need to define new "args" as they are passed to all functions. However if I try to access tuple.getString(1) in Fn2 I get an ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
So I'm really confused as to how best to define this topology. Any help appreciated.



Re: stream .each confusion

Posted by "J.R. Pauley" <jr...@gmail.com>.
Bobby,

sorry if I am misreading you, but if I don't explicitly declare "args" then
Fn2 execute has only the single parameter "B" tuple.getString(0) as its
input. I am not able to call tuple.getString(1) and tuple.size() =1, not 2.
So below can't be right. How do I access "args" in Fn2?

tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new Fn1(), new Fields("B"))
        .each(new Fields("B"), new Fn2(), new Fields("C"))
        .each(new Fields("C"), new Fn3(), new Fields("D"));

On Thu, Aug 3, 2017 at 11:36 AM, Bobby Evans <ev...@yahoo-inc.com> wrote:

> With trident the original args going into the each never go away, so you
> don't have to output them.
>
>      tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new Fn1(), new Fields("B"))
>         .each(new Fields("args","B"), new Fn2(), new Fields("C"))
>         .each(new Fields("args","C"), new Fn3(), new Fields("D"));
>
> By removing the "args" being output from Fn2() it should work for you.  I
> know it is a bit odd but that is how it works.
>
> - Bobby
>
>
>
> On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <
> jrpauley@gmail.com> wrote:
>
>
> I'm scratching my head trying to produce a simple drpc stream that does
> what I want. Logically I think I want this:
>
>      tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new Fn1(), new Fields("B"))
>         .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))
>         .each(new Fields("args","C"), new Fn3(), new Fields("D"));
>
> However I get a runtime exception for duplicated args, so I renamed the
> args to give them all unique names, like so:
>
>       tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))
>         .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))
>         .each(new Fields("args3","C"), new Fn3(), new Fields("D"));
>
> This works (sort of). The problem is I end up with output stream of lots
> of duplicate args. I don't want the original "args" emitted in the output
> stream at all. But if I emit D I still see args2,args3,D in the output.
>
> All I am trying to do is
> 1)make args available to all 3 named functions, and
> 2)supply additional arg B as input to Fn2, and
> 3)supply additional arg C as input to Fn3 which outputs D and D only as
> result
>
> I've read that I don't need to define new "args" as they are passed to all
> functions. However if I try to access tuple.getString(1) in Fn2 I get an
> ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
>
> So I'm really confused as to how best to define this topology. Any help
> appreciated.
>
>

Re: stream .each confusion

Posted by Bobby Evans <ev...@yahoo-inc.com>.
With trident the original args going into the each never go away, so you don't have to output them.
     tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new Fn1(), new Fields("B"))        .each(new Fields("args","B"), new Fn2(), new Fields("C"))        .each(new Fields("args","C"), new Fn3(), new Fields("D"));     
By removing the "args" being output from Fn2() it should work for you.  I know it is a bit odd but that is how it works.

- Bobby


On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <jr...@gmail.com> wrote:

I'm scratching my head trying to produce a simple drpc stream that does what I want. Logically I think I want this:
     tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new Fn1(), new Fields("B"))        .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))        .each(new Fields("args","C"), new Fn3(), new Fields("D"));        
However I get a runtime exception for duplicated args, so I renamed the args to give them all unique names, like so:
      tridentTopology        .newDRPCStream("crmc", null)        .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))        .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))        .each(new Fields("args3","C"), new Fn3(), new Fields("D"));        
This works (sort of). The problem is I end up with output stream of lots of duplicate args. I don't want the original "args" emitted in the output stream at all. But if I emit D I still see args2,args3,D in the output.
All I am trying to do is1)make args available to all 3 named functions, and2)supply additional arg B as input to Fn2, and3)supply additional arg C as input to Fn3 which outputs D and D only as result
I've read that I don't need to define new "args" as they are passed to all functions. However if I try to access tuple.getString(1) in Fn2 I get an ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
So I'm really confused as to how best to define this topology. Any help appreciated.