You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Priya Ganesan <ch...@hotmail.com> on 2014/01/21 05:50:31 UTC

HOW TO SPECIFY THE OUTPUT PATH IN STORM

Hi All,
We are currently working on word count topology. We tried to give a text file as input. We are running the topology without error. But we don't know how to specify the output path and where to specify the output path. It will be very helpful if we help us
Our main program:
import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;
public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile","E:\\words.txt");
conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Toplogie", conf,builder.createTopology());Thread.sleep(1000);
}}
Spout program:

import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.*;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public  class  WordReader implements IRichSpout {private SpoutOutputCollector collector;Map<String, Object> count;
private FileReader fileReader;private boolean completed = false;private TopologyContext context;public boolean isDistributed() {return false;}public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each* file line*/public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their*/this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object*/public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {this.context = context;this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));}public void deactivate(){}public void activate(){}public Map<String, Object> getComponentConfiguration(){return count;}}
Bolt - Word normalizer program:
import java.util.ArrayList;import java.util.List;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WordNormalizer implements IRichBolt {private OutputCollector collector;Map<String, Object> count;public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line** The normalize will be put the words in lower case* and split the line to get all words in this*/public void execute(Tuple input) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();//Emit the wordList a = new ArrayList();a.add(input);collector.emit(a,new Values(word));}}// Acknowledge the tuplecollector.ack(input);}public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}/*** The bolt will only emit the field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}public Map<String, Object> getComponentConfiguration(){return count;}}
Bolt - word reader program:
import java.util.HashMap;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class  WordCounter implements IRichBolt {Integer id;String name;Map<String, Integer> counters;Map<String, Object> count;private OutputCollector collector;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters*/@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On each word We will count*/@Overridepublic void execute(Tuple input) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1*/if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}//Set the tuple as Acknowledgecollector.ack(input);}/*** On create*/@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.counters = new HashMap<String, Integer>();this.collector = collector;this.name = context.getThisComponentId();this.id = context.getThisTaskId();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
public void deactivate(){}
public Map<String, Object> getComponentConfiguration(){return count;}
}

Thank You,Priya 		 	   		  

RE: HOW TO SPECIFY THE OUTPUT PATH IN STORM

Posted by Priya Ganesan <ch...@hotmail.com>.
Thanks for your kind reply sir. Should we to do any modification in the program to store the output in a file
Subject: Re: HOW TO SPECIFY THE OUTPUT PATH IN STORM
From: xumingmingv@gmail.com
Date: Tue, 21 Jan 2014 12:55:56 +0800
CC: xumingmingv@gmail.com
To: user@storm.incubator.apache.org

There is no 「output path」concept in Storm, you need to store the result yourself in file/db etc.
On 2014年1月21日, at 下午12:50, Priya Ganesan <ch...@hotmail.com> wrote:Hi All,
We are currently working on word count topology. We tried to give a text file as input. We are running the topology without error. But we don't know how to specify the output path and where to specify the output path. It will be very helpful if we help us
Our main program:
import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;
public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile","E:\\words.txt");
conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Toplogie", conf,builder.createTopology());Thread.sleep(1000);
}}
Spout program:

import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.*;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public  class  WordReader implements IRichSpout {private SpoutOutputCollector collector;Map<String, Object> count;
private FileReader fileReader;private boolean completed = false;private TopologyContext context;public boolean isDistributed() {return false;}public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each* file line*/public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their*/this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object*/public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {this.context = context;this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));}public void deactivate(){}public void activate(){}public Map<String, Object> getComponentConfiguration(){return count;}}
Bolt - Word normalizer program:
import java.util.ArrayList;import java.util.List;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WordNormalizer implements IRichBolt {private OutputCollector collector;Map<String, Object> count;public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line** The normalize will be put the words in lower case* and split the line to get all words in this*/public void execute(Tuple input) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();//Emit the wordList a = new ArrayList();a.add(input);collector.emit(a,new Values(word));}}// Acknowledge the tuplecollector.ack(input);}public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}/*** The bolt will only emit the field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}public Map<String, Object> getComponentConfiguration(){return count;}}
Bolt - word reader program:
import java.util.HashMap;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class  WordCounter implements IRichBolt {Integer id;String name;Map<String, Integer> counters;Map<String, Object> count;private OutputCollector collector;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters*/@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On each word We will count*/@Overridepublic void execute(Tuple input) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1*/if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}//Set the tuple as Acknowledgecollector.ack(input);}/*** On create*/@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.counters = new HashMap<String, Integer>();this.collector = collector;this.name = context.getThisComponentId();this.id = context.getThisTaskId();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
public void deactivate(){}
public Map<String, Object> getComponentConfiguration(){return count;}
}

Thank You,Priya
 		 	   		  

Re: HOW TO SPECIFY THE OUTPUT PATH IN STORM

Posted by James Xu <xu...@gmail.com>.
There is no 「output path」concept in Storm, you need to store the result yourself in file/db etc.

On 2014年1月21日, at 下午12:50, Priya Ganesan <ch...@hotmail.com> wrote:

> Hi All,
> 
> We are currently working on word count topology. We tried to give a text file as input. We are running the topology without error. But we don't know how to specify the output path and where to specify the output path. It will be very helpful if we help us
> 
> Our main program:
> 
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.tuple.Fields;
> 
> public class TopologyMain {
> public static void main(String[] args) throws InterruptedException {
> //Topology definition
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("word-reader",new WordReader());
> builder.setBolt("word-normalizer", new WordNormalizer())
> .shuffleGrouping("word-reader");
> builder.setBolt("word-counter", new WordCounter(),2)
> .fieldsGrouping("word-normalizer", new Fields("word"));
> //Configuration
> Config conf = new Config();
> conf.put("wordsFile","E:\\words.txt");
> 
> conf.setDebug(false);
> //Topology run
> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("Getting-Started-Toplogie", conf,
> builder.createTopology());
> Thread.sleep(1000);
> 
> }
> }
> 
> Spout program:
> 
> 
> import java.io.BufferedReader;
> import java.io.FileNotFoundException;
> import java.io.FileReader;
> import java.io.*;
> import java.util.Map;
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public  class  WordReader implements IRichSpout {
> private SpoutOutputCollector collector;
> Map<String, Object> count;
> 
> private FileReader fileReader;
> private boolean completed = false;
> private TopologyContext context;
> public boolean isDistributed() {return false;}
> public void ack(Object msgId) {
> System.out.println("OK:"+msgId);
> }
> public void close() {}
> public void fail(Object msgId) {
> System.out.println("FAIL:"+msgId);
> }
> /**
> * The only thing that the methods will do It is emit each
> * file line
> */
> public void nextTuple() {
> /**
> * The nextuple it is called forever, so if we have been readed the file
> * we will wait and then return
> */
> if(completed){
> try {
> Thread.sleep(1000);
> } catch (InterruptedException e) {
> //Do nothing
> }
> return;
> }
> String str;
> //Open the reader
> BufferedReader reader = new BufferedReader(fileReader);
> try{
> //Read all lines
> while((str = reader.readLine()) != null){
> /**
> * By each line emmit a new value with the line as a their
> */
> this.collector.emit(new Values(str),str);
> }
> }catch(Exception e){
> throw new RuntimeException("Error reading tuple",e);
> }finally{
> completed = true;
> }
> }
> /**
> * We will create the file and get the collector object
> */
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> try {
> this.context = context;
> this.fileReader = new FileReader(conf.get("wordsFile").toString());
> } catch (FileNotFoundException e) {
> throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");
> }
> this.collector = collector;
> }
> /**
> * Declare the output field "word"
> */
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("line"));
> }
> public void deactivate(){}
> public void activate(){}
> public Map<String, Object> getComponentConfiguration(){return count;}
> }
> 
> Bolt - Word normalizer program:
> 
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichBolt;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> public class WordNormalizer implements IRichBolt {
> private OutputCollector collector;
> Map<String, Object> count;
> public void cleanup() {}
> /**
> * The bolt will receive the line from the
> * words file and process it to Normalize this line
> *
> * The normalize will be put the words in lower case
> * and split the line to get all words in this
> */
> public void execute(Tuple input) {
> String sentence = input.getString(0);
> String[] words = sentence.split(" ");
> for(String word : words){
> word = word.trim();
> if(!word.isEmpty()){
> word = word.toLowerCase();
> //Emit the word
> List a = new ArrayList();
> a.add(input);
> collector.emit(a,new Values(word));
> }
> }
> // Acknowledge the tuple
> collector.ack(input);
> }
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> }
> /**
> * The bolt will only emit the field "word"
> */
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
> public Map<String, Object> getComponentConfiguration(){return count;}
> }
> 
> Bolt - word reader program:
> 
> import java.util.HashMap;
> import java.util.Map;
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichBolt;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Tuple;
> public class  WordCounter implements IRichBolt {
> Integer id;
> String name;
> Map<String, Integer> counters;
> Map<String, Object> count;
> private OutputCollector collector;
> /**
> * At the end of the spout (when the cluster is shutdown
> * We will show the word counters
> */
> @Override
> public void cleanup() {
> System.out.println("-- Word Counter ["+name+"-"+id+"] --");
> for(Map.Entry<String, Integer> entry : counters.entrySet()){
> System.out.println(entry.getKey()+": "+entry.getValue());
> }
> }
> /**
> * On each word We will count
> */
> @Override
> public void execute(Tuple input) {
> String str = input.getString(0);
> /**
> * If the word dosn't exist in the map we will create
> * this, if not We will add 1
> */
> if(!counters.containsKey(str)){
> counters.put(str, 1);
> }else{
> Integer c = counters.get(str) + 1;
> counters.put(str, c);
> }
> //Set the tuple as Acknowledge
> collector.ack(input);
> }
> /**
> * On create
> */
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.counters = new HashMap<String, Integer>();
> this.collector = collector;
> this.name = context.getThisComponentId();
> this.id = context.getThisTaskId();
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {}
> 
> public void deactivate(){}
> 
> public Map<String, Object> getComponentConfiguration(){return count;}
> 
> }
> 
> 
> Thank You,
> Priya