You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Thiago Souza <tc...@gmail.com> on 2014/10/16 16:25:55 UTC

Trying to learn Storm

I'm a Storm newbie trying to learn it. Could someone please help me?
Consider the following snippet:

     TridentTopology builder = new TridentTopology();

        // Random data generated with a random 'messageId'
     Stream inputStream = builder.newStream("data-supply", new
RandomDataGenerator() /* this extends BaseRichSpout */);

        // Load 'Datatype' referenced by datatypeId from external source
     Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new
DatatypeLoader(), new Fields("datatype")).

project(new Fields("messageId", "datatype"));

        // Load 'Provider' referenced by providerId from external source
     Stream providerStream = inputStream.each(new Fields("providerId"), new
ProviderLoader(), new Fields("provider")).

project(new Fields("messageId", "provider"));

// Join both streams on original 'messageId'
        builder.join(providerStream, new Fields("messageId"),
      datatypeStream, new Fields("messageId"),
                           new Fields("datatype", "provider")).

     each(new Fields("datatype", "provider"), new Print()); // Print the
result


I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy
loaded, but what I get is a Tuple with either one, depending on which is
the first argument of the 'join' method (in this case 'providerStream').

Am I doing something wrong or maybe with a misconcept of how storm works?

Thanks,
Thiago Souza

Re: Re: Trying to learn Storm

Posted by "shengyi.pan" <sh...@gmail.com>.
Hi, all:

according to http://storm.apache.org/documentation/Trident-API-Overview.html

 A list of all non-join fields from all streams, in order of how the streams were passed to the join method

if your code is :

Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new DatatypeLoader(), new Fields("datatype")).project(new Fields("messageId", "datatype"));


Stream providerStream = inputStream.each(new Fields("providerId"), new ProviderLoader(), new Fields("provider")).project(new Fields("messageId", "provider"));
     
// Join both streams on original 'messageId'
builder.join(providerStream, new Fields("messageId"),
      datatypeStream, new Fields("messageId"),
                           new Fields("messageId", "datatype", "provider")).                 
     each(new Fields("datatype", "provider"), new Print()); // Print the result

The "datatype" correspond to "provider" from providerStream,  the "provider" correspond to "dataType" from datatypeStream.  Maybe it's a bug in your application...

If I am wrong, please correct me , thx ~~


2014-10-17



shengyi.pan



发件人:Thiago Souza <tc...@gmail.com>
发送时间:2014-10-17 04:55
主题:Re: Trying to learn Storm
收件人:"user"<us...@storm.apache.org>
抄送:

Got it! Thanks! :)


On Thu, Oct 16, 2014 at 4:26 PM, Chin Huang <ch...@mulu.me> wrote:

The tuples emitted from the join must have these fields:


1. First, the fields you are joining on.
2. Next, the other fields, in the same order as the streams passed to the join method and in the same order as the input tuple fields.


For your example, the join output tuple fields would be


    new Fields("messageId", "datatype", "provider")


On Thu, Oct 16, 2014 at 7:25 AM, Thiago Souza <tc...@gmail.com> wrote:

I'm a Storm newbie trying to learn it. Could someone please help me? Consider the following snippet:


     TridentTopology builder = new TridentTopology();

        // Random data generated with a random 'messageId'   
     Stream inputStream = builder.newStream("data-supply", new RandomDataGenerator() /* this extends BaseRichSpout */);


        // Load 'Datatype' referenced by datatypeId from external source     
     Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new DatatypeLoader(), new Fields("datatype")).
                                                                      project(new Fields("messageId", "datatype"));


        // Load 'Provider' referenced by providerId from external source     
     Stream providerStream = inputStream.each(new Fields("providerId"), new ProviderLoader(), new Fields("provider")).
                                                                      project(new Fields("messageId", "provider"));
     
// Join both streams on original 'messageId'
        builder.join(providerStream, new Fields("messageId"),

      datatypeStream, new Fields("messageId"),
                           new Fields("datatype", "provider")).                 
     each(new Fields("datatype", "provider"), new Print()); // Print the result




I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy loaded, but what I get is a Tuple with either one, depending on which is the first argument of the 'join' method (in this case 'providerStream').


Am I doing something wrong or maybe with a misconcept of how storm works?


Thanks,
Thiago Souza

Re: Trying to learn Storm

Posted by Thiago Souza <tc...@gmail.com>.
Got it! Thanks! :)

On Thu, Oct 16, 2014 at 4:26 PM, Chin Huang <ch...@mulu.me> wrote:

> The tuples emitted from the join must have these fields:
>
> 1. First, the fields you are joining on.
> 2. Next, the other fields, in the same order as the streams passed to the
> join method and in the same order as the input tuple fields.
>
> For your example, the join output tuple fields would be
>
>     new Fields("messageId", "datatype", "provider")
>
> On Thu, Oct 16, 2014 at 7:25 AM, Thiago Souza <tc...@gmail.com>
> wrote:
>
>> I'm a Storm newbie trying to learn it. Could someone please help me?
>> Consider the following snippet:
>>
>>      TridentTopology builder = new TridentTopology();
>>
>>         // Random data generated with a random 'messageId'
>>      Stream inputStream = builder.newStream("data-supply", new
>> RandomDataGenerator() /* this extends BaseRichSpout */);
>>
>>         // Load 'Datatype' referenced by datatypeId from external source
>>
>>      Stream datatypeStream = inputStream.each(new Fields("datatypeId"),
>> new DatatypeLoader(), new Fields("datatype")).
>>
>> project(new Fields("messageId", "datatype"));
>>
>>         // Load 'Provider' referenced by providerId from external source
>>
>>      Stream providerStream = inputStream.each(new Fields("providerId"),
>> new ProviderLoader(), new Fields("provider")).
>>
>> project(new Fields("messageId", "provider"));
>>
>> // Join both streams on original 'messageId'
>>         builder.join(providerStream, new Fields("messageId"),
>>       datatypeStream, new Fields("messageId"),
>>                            new Fields("datatype", "provider")).
>>
>>      each(new Fields("datatype", "provider"), new Print()); // Print the
>> result
>>
>>
>> I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy
>> loaded, but what I get is a Tuple with either one, depending on which is
>> the first argument of the 'join' method (in this case 'providerStream').
>>
>> Am I doing something wrong or maybe with a misconcept of how storm works?
>>
>> Thanks,
>> Thiago Souza
>>
>
>

Re: Trying to learn Storm

Posted by Chin Huang <ch...@mulu.me>.
The tuples emitted from the join must have these fields:

1. First, the fields you are joining on.
2. Next, the other fields, in the same order as the streams passed to the
join method and in the same order as the input tuple fields.

For your example, the join output tuple fields would be

    new Fields("messageId", "datatype", "provider")

On Thu, Oct 16, 2014 at 7:25 AM, Thiago Souza <tc...@gmail.com> wrote:

> I'm a Storm newbie trying to learn it. Could someone please help me?
> Consider the following snippet:
>
>      TridentTopology builder = new TridentTopology();
>
>         // Random data generated with a random 'messageId'
>      Stream inputStream = builder.newStream("data-supply", new
> RandomDataGenerator() /* this extends BaseRichSpout */);
>
>         // Load 'Datatype' referenced by datatypeId from external source
>
>      Stream datatypeStream = inputStream.each(new Fields("datatypeId"),
> new DatatypeLoader(), new Fields("datatype")).
>
> project(new Fields("messageId", "datatype"));
>
>         // Load 'Provider' referenced by providerId from external source
>
>      Stream providerStream = inputStream.each(new Fields("providerId"),
> new ProviderLoader(), new Fields("provider")).
>
> project(new Fields("messageId", "provider"));
>
> // Join both streams on original 'messageId'
>         builder.join(providerStream, new Fields("messageId"),
>       datatypeStream, new Fields("messageId"),
>                            new Fields("datatype", "provider")).
>
>      each(new Fields("datatype", "provider"), new Print()); // Print the
> result
>
>
> I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy
> loaded, but what I get is a Tuple with either one, depending on which is
> the first argument of the 'join' method (in this case 'providerStream').
>
> Am I doing something wrong or maybe with a misconcept of how storm works?
>
> Thanks,
> Thiago Souza
>