You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Manuel Dossinger <do...@cs.uni-kl.de> on 2019/05/14 08:38:24 UTC

ConsoleStormReporter on LocalCluster

Hi everybody,

I tried to use the "new metrics reporting api" as shown here: http://storm.apache.org/releases/1.2.2/metrics_v2.html, however I had no luck.

The following is an example that should work according to my understanding of the documentation. It constructs a topology consisting of a single spout that emits a 1 every second.
This 1 is received by a bolt which just prints some string to stdout and increments a counter as described in the metrics reporting page above.
In the main method I add a config entry that should wire up the reporter. However, when starting the program, I only see the printlns from the bolt, and nothing from the reporter.

I tested this on a cluster with a CSVReporter, but also nothing.

What am I doing wrong?


```
public class MetricTest {
    public static void main(String... args) {
        System.out.println("Building Topology...");
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("s", new DummySpout());
        builder.setBolt("b", new DummyBolt())
                .shuffleGrouping("s");

        Config config = new Config();
        List<Map<String, Object>> reportersConfig = new ArrayList<>();
        Map<String, Object> consoleReporterConfig = new HashMap<>();
        consoleReporterConfig.put("class", "org.apache.storm.metrics2.reporters.ConsoleStormReporter");
        List<String> daemonList = new ArrayList<>();
        daemonList.add("worker");
        daemonList.add("nimbus");
        daemonList.add("supervisor");
        consoleReporterConfig.put("daemons", daemonList);
        consoleReporterConfig.put("report.period", 5);
        consoleReporterConfig.put("report.period.units", "SECONDS");
        reportersConfig.add(consoleReporterConfig);

        config.put("storm.metrics.reporters", reportersConfig);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("t", config, builder.createTopology());
        Utils.sleep(60_000);
        cluster.killTopology("t");
        cluster.shutdown();
    }
}

class DummySpout extends BaseRichSpout {
    SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        collector.emit(new Values(1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("x"));
    }
}

class DummyBolt extends BaseRichBolt {
    private Counter tupleCounter;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.tupleCounter = context.registerCounter("tupleCount");
    }

    @Override
    public void execute(Tuple input) {
        System.out.println("Received tuple with x=" + input.getValue(0));
        this.tupleCounter.inc();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
```


Viele Grüße
Best Regards

Manuel Dossinger
dossinger@cs.uni-kl.de


Re: ConsoleStormReporter on LocalCluster

Posted by Manuel Dossinger <do...@cs.uni-kl.de>.

> On 16. May 2019, at 20:06, Stig Rohde Døssing <st...@gmail.com> wrote:
> 
> I may be completely off base with this, but I think metrics reporters are configured in the cluster configuration, and not on a per-topology basis. Note how the documentation you linked sets the config in storm.yaml, rather than in the topology configuration.
> 
> Could you try putting the metrics configuration in the cluster configuration instead? On 1.x you would do this by making your LocalCluster via the Testing.withLocalCluster method. That method takes a MkClusterParam, which takes the cluster config map. Example here https://github.com/xumingming/storm-lib/blob/50394ff463da0d92ac4e07dee78a664a3a227e9c/src/jvm/storm/TestingApiDemo.java#L88
> 

This did it (at least for my 1.2.2 setup), thank you!

I thought this should be configured in the topology because the filter expression refers to my_component.

Btw, there's still an error in the the documentation I linked: the CsvReporter is called CsvStormReporter. I'll send a PR.


Best
Manuel

Re: ConsoleStormReporter on LocalCluster

Posted by Stig Rohde Døssing <st...@gmail.com>.
I may be completely off base with this, but I think metrics reporters are
configured in the cluster configuration, and not on a per-topology basis.
Note how the documentation you linked sets the config in storm.yaml, rather
than in the topology configuration.

Could you try putting the metrics configuration in the cluster
configuration instead? On 1.x you would do this by making your LocalCluster
via the Testing.withLocalCluster method. That method takes a
MkClusterParam, which takes the cluster config map. Example here
https://github.com/xumingming/storm-lib/blob/50394ff463da0d92ac4e07dee78a664a3a227e9c/src/jvm/storm/TestingApiDemo.java#L88

In 2.x you can configure this easier from Java using the
LocalCluster.Builder.

Den tir. 14. maj 2019 kl. 10.49 skrev Manuel Dossinger <
dossinger@cs.uni-kl.de>:

> Hi everybody,
>
> I tried to use the "new metrics reporting api" as shown here:
> http://storm.apache.org/releases/1.2.2/metrics_v2.html, however I had no
> luck.
>
> The following is an example that should work according to my understanding
> of the documentation. It constructs a topology consisting of a single spout
> that emits a 1 every second.
> This 1 is received by a bolt which just prints some string to stdout and
> increments a counter as described in the metrics reporting page above.
> In the main method I add a config entry that should wire up the reporter.
> However, when starting the program, I only see the printlns from the bolt,
> and nothing from the reporter.
>
> I tested this on a cluster with a CSVReporter, but also nothing.
>
> What am I doing wrong?
>
>
> ```
> public class MetricTest {
>     public static void main(String... args) {
>         System.out.println("Building Topology...");
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("s", new DummySpout());
>         builder.setBolt("b", new DummyBolt())
>                 .shuffleGrouping("s");
>
>         Config config = new Config();
>         List<Map<String, Object>> reportersConfig = new ArrayList<>();
>         Map<String, Object> consoleReporterConfig = new HashMap<>();
>         consoleReporterConfig.put("class",
> "org.apache.storm.metrics2.reporters.ConsoleStormReporter");
>         List<String> daemonList = new ArrayList<>();
>         daemonList.add("worker");
>         daemonList.add("nimbus");
>         daemonList.add("supervisor");
>         consoleReporterConfig.put("daemons", daemonList);
>         consoleReporterConfig.put("report.period", 5);
>         consoleReporterConfig.put("report.period.units", "SECONDS");
>         reportersConfig.add(consoleReporterConfig);
>
>         config.put("storm.metrics.reporters", reportersConfig);
>
>         LocalCluster cluster = new LocalCluster();
>         cluster.submitTopology("t", config, builder.createTopology());
>         Utils.sleep(60_000);
>         cluster.killTopology("t");
>         cluster.shutdown();
>     }
> }
>
> class DummySpout extends BaseRichSpout {
>     SpoutOutputCollector collector;
>
>     @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>         this.collector = collector;
>     }
>
>     @Override
>     public void nextTuple() {
>         Utils.sleep(1000);
>         collector.emit(new Values(1));
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("x"));
>     }
> }
>
> class DummyBolt extends BaseRichBolt {
>     private Counter tupleCounter;
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         this.tupleCounter = context.registerCounter("tupleCount");
>     }
>
>     @Override
>     public void execute(Tuple input) {
>         System.out.println("Received tuple with x=" + input.getValue(0));
>         this.tupleCounter.inc();
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) { }
> }
> ```
>
>
> Viele Grüße
> Best Regards
>
> Manuel Dossinger
> dossinger@cs.uni-kl.de
>
>