You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/11/03 20:19:53 UTC

TridentWordCount running on cluster

Hi, All

I am running the TidentWordCount in storm-starter,

public static StormTopology buildTopology(LocalDRPC drpc) {
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
        new Values("the man went to the store and bought some candy"), new
Values("four score and seven years ago"),
        new Values("how many apples can you eat"), new Values("to be or not
to be the person"));
    spout.setCycle(true);

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1",
spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new
Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    topology.newDRPCStream("words", drpc).each(new Fields("args"), new
Split(), new Fields("word")).groupBy(new Fields(
        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(),
new Fields("count")).each(new Fields("count"),
        new FilterNull()).aggregate(new Fields("count"), new Sum(), new
Fields("sum"));
    return topology.build();
  }

  public static void main(String[] args) throws Exception {
    Config conf = new Config();
    conf.setMaxSpoutPending(20);
    if (args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
      for (int i = 0; i < 100; i++) {
        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the
dog jumped"));
        Thread.sleep(1000);
      }
    }
    else {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
  }
}

It prints the count results on screen running on local mode, like
DRPC RESULT: [[48]]
DRPC RESULT: [[107]]
DRPC RESULT: [[161]]
DRPC RESULT: [[215]]
DRPC RESULT: [[270]]
DRPC RESULT: [[330]]
......

However, when I submit the topology to storm cluster, I can see it in storm
UI (see attachment), but it says the number of workers is 0 even
setNumWorkers(3), how do I know the results or status running topology in
cluster, seems I lose the monitoring after submitting.


thanks

Alec

Re: TridentWordCount running on cluster

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Hi Alec,

Looking at the Storm UI screenshot, it appears you have no supervisor nodes in your cluster, and thus no worker slots available to any of your topologies.

What is your cluster setup?

-Taylor

On Nov 3, 2014, at 2:25 PM, Sa Li <sa...@gmail.com> wrote:

> Sorry for the naive, I really do not know how to look at the results after topology being submitted to cluster, I feel Storm UI doesn't provide sufficient info, one of my topology running in local mode, and being able to insert the tuples into postgresql DB, but when I run this topology in cluster mode, postgresqlDB was not updated, and can't see any hints from Storm UI, even tail nimbus log wouldn't tell any useful info. 
> 
> 
> thanks
> 
> Alec
> 
> On Mon, Nov 3, 2014 at 11:19 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, All
> 
> I am running the TidentWordCount in storm-starter, 
> 
> public static StormTopology buildTopology(LocalDRPC drpc) {
>     FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
>         new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
>         new Values("how many apples can you eat"), new Values("to be or not to be the person"));
>     spout.setCycle(true);
> 
>     TridentTopology topology = new TridentTopology();
>     TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
>         new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
>         new Count(), new Fields("count")).parallelismHint(16);
> 
>     topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
>         "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
>         new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
>     return topology.build();
>   }
> 
>   public static void main(String[] args) throws Exception {
>     Config conf = new Config();
>     conf.setMaxSpoutPending(20);
>     if (args.length == 0) {
>       LocalDRPC drpc = new LocalDRPC();
>       LocalCluster cluster = new LocalCluster();
>       cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
>       for (int i = 0; i < 100; i++) {
>         System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
>         Thread.sleep(1000);
>       }
>     }
>     else {
>       conf.setNumWorkers(3);
>       StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
>     }
>   }
> }
> 
> It prints the count results on screen running on local mode, like 
> DRPC RESULT: [[48]]
> DRPC RESULT: [[107]]
> DRPC RESULT: [[161]]
> DRPC RESULT: [[215]]
> DRPC RESULT: [[270]]
> DRPC RESULT: [[330]]
> ......
> 
> However, when I submit the topology to storm cluster, I can see it in storm UI (see attachment), but it says the number of workers is 0 even setNumWorkers(3), how do I know the results or status running topology in cluster, seems I lose the monitoring after submitting. 
> 
> 
> thanks
> 
> Alec
> 
> 


Re: TridentWordCount running on cluster

Posted by Sa Li <sa...@gmail.com>.
Sorry for the naive, I really do not know how to look at the results after
topology being submitted to cluster, I feel Storm UI doesn't provide
sufficient info, one of my topology running in local mode, and being able
to insert the tuples into postgresql DB, but when I run this topology in
cluster mode, postgresqlDB was not updated, and can't see any hints from
Storm UI, even tail nimbus log wouldn't tell any useful info.


thanks

Alec

On Mon, Nov 3, 2014 at 11:19 AM, Sa Li <sa...@gmail.com> wrote:

> Hi, All
>
> I am running the TidentWordCount in storm-starter,
>
> public static StormTopology buildTopology(LocalDRPC drpc) {
>     FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
> new Values("the cow jumped over the moon"),
>         new Values("the man went to the store and bought some candy"), new
> Values("four score and seven years ago"),
>         new Values("how many apples can you eat"), new Values("to be or
> not to be the person"));
>     spout.setCycle(true);
>
>     TridentTopology topology = new TridentTopology();
>     TridentState wordCounts = topology.newStream("spout1",
> spout).parallelismHint(16).each(new Fields("sentence"),
>         new Split(), new Fields("word")).groupBy(new
> Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
>         new Count(), new Fields("count")).parallelismHint(16);
>
>     topology.newDRPCStream("words", drpc).each(new Fields("args"), new
> Split(), new Fields("word")).groupBy(new Fields(
>         "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(),
> new Fields("count")).each(new Fields("count"),
>         new FilterNull()).aggregate(new Fields("count"), new Sum(), new
> Fields("sum"));
>     return topology.build();
>   }
>
>   public static void main(String[] args) throws Exception {
>     Config conf = new Config();
>     conf.setMaxSpoutPending(20);
>     if (args.length == 0) {
>       LocalDRPC drpc = new LocalDRPC();
>       LocalCluster cluster = new LocalCluster();
>       cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
>       for (int i = 0; i < 100; i++) {
>         System.out.println("DRPC RESULT: " + drpc.execute("words", "cat
> the dog jumped"));
>         Thread.sleep(1000);
>       }
>     }
>     else {
>       conf.setNumWorkers(3);
>       StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
>     }
>   }
> }
>
> It prints the count results on screen running on local mode, like
> DRPC RESULT: [[48]]
> DRPC RESULT: [[107]]
> DRPC RESULT: [[161]]
> DRPC RESULT: [[215]]
> DRPC RESULT: [[270]]
> DRPC RESULT: [[330]]
> ......
>
> However, when I submit the topology to storm cluster, I can see it in
> storm UI (see attachment), but it says the number of workers is 0 even
> setNumWorkers(3), how do I know the results or status running topology in
> cluster, seems I lose the monitoring after submitting.
>
>
> thanks
>
> Alec
>
>